Source code for pyrodeo.hydro

# -*- coding: utf-8 -*-
"""Defines the Hydro class performing hydrodynamic updates of the state.
"""

from __future__ import print_function

import numpy as np
from pyrodeo.state import State
from pyrodeo.linear_advection import LinearAdvection
from pyrodeo.roe import Roe

[docs]class Hydro: """Construct class for hydrodynamic updates. Construct from existing instances of :class:`.Param` and :class:`.Coordinates`. It constructs a :class:`.LinearAdvection` instance which will deal with orbital advection, and a :class:`.Roe` instance dealing with hydrodynamics of residual velocities. Args: param (:class:`.Param`): Valid Param object, containing simulation parameters. coords (:class:`.Coordinates`): Valid Coordinates object, containing x, y and z coordinates. Used to calculate orbital advection velocity. The following attributes and methods are available: Attributes: orbital_advection (:class:`.LinearAdvection`): Instance of :class:`.LinearAdvection` class used to do orbital advection. roe (:class:`.Roe`): Instance of :class:`.Roe` class dealing with hydrodynamics of residual velocities. """ def __init__(self, param, coords): v_adv = 0.0*coords.x if param.geometry == 'sheet': v_adv = (-1.5*coords.x).transpose((1,0,2)) if param.geometry == 'cyl': v_adv = (np.power(coords.x, -1.5) - param.frame_rotation).transpose((1,0,2)) if param.geometry == 'sph': v_adv = (np.power(coords.x, -1.5)/np.sin(coords.z) - param.frame_rotation).transpose((1,0,2)) self.orbital_advection = LinearAdvection(v_adv, param.limiter_param) self.roe = Roe(param.limiter_param, param.min_dens)
[docs] def calc_time_step(self, geometry, coords, state, log_radial=False): """Calculate time step obeying the CFL condition. Args: geometry (str): 'cart', 'sheet' or 'cyl'. coords (:class:`.Coordinates`): Valid :class:`.Coordinates` object, containing x, y and z coordinates. state (:class:`.State`): Valid :class:`.State` object, containing density and velocity. log_radial (:obj:`bool`, optional): Flag indicating whether a logarithmic radial coordinate is used Returns: float: Maximum time step obeying the CFL condition. """ cs = state.soundspeed dtx = 1.0e10 dty = dtx dtz = dtx if len(state.dens[:,0,0]) > 1: abs_speed = (np.abs(state.velx) + cs)*state.no_ghost if log_radial is True: abs_speed /= coords.x dtx = coords.dxyz[0]/np.max(abs_speed) if len(state.dens[0,:,0]) > 1: abs_speed = (np.abs(state.vely) + cs)*state.no_ghost if geometry == 'cyl': abs_speed /= coords.x if geometry == 'sph': abs_speed /= (coords.x*np.sin(coords.z)) dty = coords.dxyz[1]/np.max(abs_speed) if len(state.dens[0,0,:]) > 1: abs_speed = (np.abs(state.velz) + cs)*state.no_ghost if geometry == 'sph': abs_speed /= coords.x dtz = coords.dxyz[2]/np.max(abs_speed) return np.min([dtx, dty, dtz])
[docs] def shear_periodic_boundaries(self, t, coords, state): """Set shear-periodic boundary conditions. In the shearing sheet geometry, the x direction can be quasi-periodic, i.e. periodic but modified for the shear. Imagine neighbouring sheets shearing past the center sheet. Args: t (float): Current simulation time. Used to calculate over what distance neighbouring sheets have shifted since t = 0. coords (:class:`.Coordinates`): Valid :class:`.Coordinates` object, containing x and y coordinates. state (:class:`.State`): Valid :class:`.State` object, containing density and velocity. """ # Single cell in x; nothing to do if len(state.dens[:,0,0]) <= 1: return # Create State made just of ghost zones dens = np.concatenate((state.dens[-4:-2,:,:], state.dens[2:4,:,:])) velx = np.concatenate((state.velx[-4:-2,:,:], state.velx[2:4,:,:])) vely = np.concatenate((state.vely[-4:-2,:,:], state.vely[2:4,:,:])) velz = np.concatenate((state.velz[-4:-2,:,:], state.velz[2:4,:,:])) temp_state = State(dens, velx, vely, velz, velz) # Advection velocity is speed of next box v_adv = 0.0*dens + 1.5*(coords.x[-2:-1,0,0] + coords.x[-3:-2,0,0]) v_adv[2:4,:] = -v_adv[2:4,:,:] # Linear advection since t = 0 when solution was periodic temp_state.transpose((1,0,2)) v_adv = v_adv.transpose((1,0,2)) la = LinearAdvection(v_adv, self.orbital_advection.sb) la.step(t, coords.dxyz[1], temp_state) temp_state.transpose((1,0,2)) # Store in ghost zones state.dens[:2,:,:] = temp_state.dens[:2,:,:] state.velx[:2,:,:] = temp_state.velx[:2,:,:] state.vely[:2,:,:] = temp_state.vely[:2,:,:] state.velz[:2,:,:] = temp_state.velz[:2,:,:] state.dens[-2:,:,:] = temp_state.dens[2:,:,:] state.velx[-2:,:,:] = temp_state.velx[2:,:,:] state.vely[-2:,:,:] = temp_state.vely[2:,:,:] state.velz[-2:,:,:] = temp_state.velz[2:,:,:]
[docs] def preprocess(self, coords, param, state, direction): """Modify state to quasi-cartesian form and calculate geometric source terms. Isothermal hydrodynamics allows for a generic form of the equations in all geometries, subject only to modifications in the geometric source terms. Args: coords (:class:`.Coordinates`): Valid :class:`.Coordinates` object, containing x and y coordinates. param (:class:`.Param`): Valid :class:`.Param` object, containing simulation parameters. state (:class:`.State`): Valid :class:`.State` object, containing density and velocity. direction (int): 0 (integrating x) or 1 (integrating y). Returns: ndarray: Geometric source term of the same shape as state.dens. """ source = 0.0*state.dens if param.geometry == 'sheet': if direction == 0: state.vely += 0.5*param.frame_rotation*coords.x source = 2.0*state.dens*param.frame_rotation*\ (state.vely - 0.5*param.frame_rotation*coords.x) if direction == 2: source = -state.dens*param.frame_rotation*\ param.frame_rotation*coords.z if param.geometry == 'cyl': if direction == 0: state.dens *= coords.x state.vely = coords.x*state.vely + np.sqrt(coords.x) dpot = coords.x*np.power(coords.x*coords.x + coords.z*coords.z, -1.5) source = \ state.dens*state.vely*state.vely/ \ (coords.x*coords.x*coords.x) - \ state.dens*dpot + \ state.soundspeed*state.soundspeed*state.dens/coords.x if param.log_radial is True: state.velx = state.velx/coords.x state.dens *= coords.x state.soundspeed /= coords.x source = source - state.dens*state.velx*state.velx - \ state.soundspeed*state.soundspeed*state.dens if direction == 1: state.vely /= coords.x state.soundspeed /= coords.x if direction == 2: dpot = coords.z*np.power(coords.x*coords.x + coords.z*coords.z, -1.5) source = -state.dens*dpot if param.geometry == 'sph': if direction == 0: c = state.soundspeed state.dens *= coords.x*coords.x state.vely = state.vely*coords.x + np.sqrt(coords.x) state.velz *= coords.x source = state.dens*(((state.vely*state.vely + state.velz*state.velz)/coords.x - 1.0)/(coords.x*coords.x) + 2.0*c*c/coords.x) if param.log_radial is True: state.velx = state.velx/coords.x state.dens *= coords.x state.soundspeed /= coords.x source = source - state.dens*state.velx*state.velx - \ state.soundspeed*state.soundspeed*state.dens if direction == 1: g = 1.0/(coords.x*np.sin(coords.z)) state.vely *= g state.soundspeed *= g if direction == 2: sint = np.sin(coords.z) state.dens *= sint state.vely = sint*(state.vely/coords.x + np.power(coords.x, -1.5)) state.velz /= coords.x state.soundspeed /= coords.x cott = np.cos(coords.z)/sint source = state.dens*cott*(state.vely*state.vely/(sint*sint) + state.soundspeed*state.soundspeed) if direction == 1: source = source.transpose((1,0,2)) state.transpose((1,0,2)) state.swap_velocities(1) if direction == 2: source = source.transpose((2,1,0)) state.transpose((2,1,0)) state.swap_velocities(2) return source
[docs] def postprocess(self, coords, param, state, direction): """Inverse of :meth:`.preprocess`. Reverse modifications by :meth:`.preprocess`. Args: coords (:class:`.Coordinates`): Valid :class:`.Coordinates` object, containing x and y coordinates. param (:class:`.Param`): Valid :class:`.Param` object, containing simulation parameters. state (:class:`.State`): Valid :class:`.State` object, containing density and velocity. direction (int): 0 (integrating x) or 1 (integrating y). """ if direction == 1: state.transpose((1,0,2)) state.swap_velocities(1) if direction == 2: state.transpose((2,1,0)) state.swap_velocities(2) if param.geometry == 'sheet': if direction == 0: state.vely -= 0.5*param.frame_rotation*coords.x if param.geometry == 'cyl': if direction == 0: state.dens /= coords.x state.vely = state.vely/coords.x - np.power(coords.x, -0.5) if param.log_radial is True: state.velx *= coords.x state.dens /= coords.x state.soundspeed *= coords.x if direction == 1: state.vely *= coords.x state.soundspeed *= coords.x if param.geometry == 'sph': if direction == 0: state.dens /= (coords.x*coords.x) state.vely = (state.vely - np.sqrt(coords.x))/coords.x state.velz /= coords.x if param.log_radial is True: state.velx *= coords.x state.dens /= coords.x state.soundspeed *= coords.x if direction == 1: g = coords.x*np.sin(coords.z) state.vely *= g state.soundspeed *= g if direction == 2: sint = np.sin(coords.z) state.dens /= sint state.vely = state.vely*coords.x/sint - \ np.power(coords.x, -0.5) state.velz *= coords.x state.soundspeed *= coords.x
[docs] def evolve(self, t, t_max, coords, param, state, source_func, source_param): """Evolve state from t to tmax. Args: t (float): Current simulation time. t_max (float): Simulation time to reach before stopping. coords (:class:`.Coordinates`): Valid :class:`.Coordinates` object, containing x and y coordinates. param (:class:`.Param`): Valid :class:`.Param` object, containing simulation parameters. state (:class:`.State`): Valid :class:`.State` object, containing density and velocity. source_func (callable): Function integrating any extra source terms (non-geometric). It should accept the following arguments: t, dt, coords, state, source_param. source_param (array-like): Extra parameters for source_func. Returns: (tuple): Tuple consisting of: t (float): new simulation time (= t_max if no problems encountered). :class:`.State`: Updated :class:`.State`. """ # Evolve until t = t_max while (t < t_max): # Calculate time step dt = param.courant*self.calc_time_step(param.geometry, coords, state, param.log_radial) # End exactly on t_max if (t + dt > t_max): dt = t_max - t # Copy state in case something goes wrong old_state = State.copy(state) # Continue until everything OK success = False while success is False: print("Current time: {}, time step: {}".format(t, dt)) # Shear periodic x boundaries if necessary if param.boundaries[0][0] == 'shear periodic': self.shear_periodic_boundaries(t, coords, state) # Dimensional split: do all dimensions independently for dim in (0,1,2): if np.shape(state.dens)[dim] > 1: source = self.preprocess(coords, param, state, dim) # Hydrodynamic update self.roe.step(dt, coords.dxyz[dim], state, source, param.boundaries[dim]) # Orbital advection if dim == 1: self.orbital_advection.step(dt, coords.dxyz[dim], state) self.postprocess(coords, param, state, dim) # Check if density positive success = True mindens = np.min(state.dens) # If not, restore state and try with smaller time step if mindens < 0.0: state = State.copy(old_state) dt = 0.5*dt success = False # Integrate extra source terms (ignoring ghost zones!) if source_func is not None: source_func(t, dt, coords, state, source_param) # Update simulation time t = t + dt return t, state