Source code for fdtd.grid

""" The FDTD Grid

The grid is the core of the FDTD Library. It is where everything comes
together and where the biggest part of the calculations are done.


## Imports

# standard library
import os
from math import pi
from os import path, makedirs, chdir, remove
from subprocess import check_call, CalledProcessError
from glob import glob
from datetime import datetime

# 3rd party
from tqdm import tqdm
from numpy import savez

# typing
from .typing_ import Tuple, Number, Tensorlike

# relative
from .backend import backend as bd

## Constants
SPEED_LIGHT: float = 299_792_458.0  # [m/s] speed of light
VACUUM_PERMEABILITY: float = 4e-7 * pi  # vacuum permeability
VACUUM_PERMITTIVITY: float = 1.0 / (
)  # vacuum permittivity

[docs]class d_: ''' Just a convenience function for indexing polarizations of arrays. ''' X = 0 Y = 1 Z = 2
## Functions
[docs]def curl_E(E: Tensorlike) -> Tensorlike: """Transforms an E-type field into an H-type field by performing a curl operation Args: E: Electric field to take the curl of (E-type field located on the edges of the grid cell [integer gridpoints]) Returns: The curl of E (H-type field located on the faces of the grid [half-integer grid points]) """ curl = bd.zeros(E.shape) curl[:, :-1, :, 0] += E[:, 1:, :, 2] - E[:, :-1, :, 2] curl[:, :, :-1, 0] -= E[:, :, 1:, 1] - E[:, :, :-1, 1] curl[:, :, :-1, 1] += E[:, :, 1:, 0] - E[:, :, :-1, 0] curl[:-1, :, :, 1] -= E[1:, :, :, 2] - E[:-1, :, :, 2] curl[:-1, :, :, 2] += E[1:, :, :, 1] - E[:-1, :, :, 1] curl[:, :-1, :, 2] -= E[:, 1:, :, 0] - E[:, :-1, :, 0] return curl
[docs]def curl_H(H: Tensorlike) -> Tensorlike: """Transforms an H-type field into an E-type field by performing a curl operation Args: H: Magnetic field to take the curl of (H-type field located on half-integer grid points) Returns: The curl of H (E-type field located on the edges of the grid [integer grid points]) """ curl = bd.zeros(H.shape) curl[:, 1:, :, 0] += H[:, 1:, :, 2] - H[:, :-1, :, 2] curl[:, :, 1:, 0] -= H[:, :, 1:, 1] - H[:, :, :-1, 1] curl[:, :, 1:, 1] += H[:, :, 1:, 0] - H[:, :, :-1, 0] curl[1:, :, :, 1] -= H[1:, :, :, 2] - H[:-1, :, :, 2] curl[1:, :, :, 2] += H[1:, :, :, 1] - H[:-1, :, :, 1] curl[:, 1:, :, 2] -= H[:, 1:, :, 0] - H[:, :-1, :, 0] return curl
## FDTD Grid Class
[docs]class Grid: """The FDTD Grid The grid is the core of the FDTD Library. It is where everything comes together and where the biggest part of the calculations are done. """ from .visualization import visualize
[docs] def __init__( self, shape: Tuple[Number, Number, Number], grid_spacing: float = 155e-9, permittivity: float = 1.0, permeability: float = 1.0, courant_number: float = None, ): """ Args: shape: shape of the FDTD grid. grid_spacing: distance between the grid cells. permittivity: the relative permittivity of the background. permeability: the relative permeability of the background. courant_number: the courant number of the FDTD simulation. Defaults to the inverse of the square root of the number of dimensions > 1 (optimal value). The timestep of the simulation will be derived from this number using the CFL-condition. """ # save the grid spacing self.grid_spacing = float(grid_spacing) # save grid shape as integers self.Nx, self.Ny, self.Nz = self._handle_tuple(shape) # dimension of the simulation: self.D = int(self.Nx > 1) + int(self.Ny > 1) + int(self.Nz > 1) # courant number of the simulation (optimal value) max_courant_number = float(self.D) ** (-0.5) if courant_number is None: # slight stability factor added self.courant_number = 0.99 * max_courant_number elif courant_number > max_courant_number: raise ValueError( f"courant_number {courant_number} too high for " f"a {self.D}D simulation" ) else: self.courant_number = float(courant_number) # timestep of the simulation self.time_step = self.courant_number * self.grid_spacing / SPEED_LIGHT # save electric and magnetic field self.E = bd.zeros((self.Nx, self.Ny, self.Nz, 3)) self.H = bd.zeros((self.Nx, self.Ny, self.Nz, 3)) # save the inverse of the relative permittiviy and the relative permeability # these tensors can be anisotropic! if bd.is_array(permittivity) and len(permittivity.shape) == 3: permittivity = permittivity[:, :, :, None] self.inverse_permittivity = bd.ones((self.Nx, self.Ny, self.Nz, 3)) / float( permittivity ) if bd.is_array(permeability) and len(permeability.shape) == 3: permeability = permeability[:, :, :, None] self.inverse_permeability = bd.ones((self.Nx, self.Ny, self.Nz, 3)) / float( permeability ) # save current time index self.time_steps_passed = 0 # dictionary containing the sources: self.sources = [] # dictionary containing the boundaries self.boundaries = [] # dictionary containing the detectors self.detectors = [] # dictionary containing the objects in the grid self.objects = [] # folder path to store the simulation self.folder = None
def _handle_distance(self, distance: Number) -> int: """ transform a distance to an integer number of gridpoints """ if not isinstance(distance, int): return int(float(distance) / self.grid_spacing + 0.5) return distance def _handle_time(self, time: Number) -> int: """ transform a time value to an integer number of timesteps """ if not isinstance(time, int): return int(float(time) / self.time_step + 0.5) return time def _handle_tuple( self, shape: Tuple[Number, Number, Number] ) -> Tuple[int, int, int]: """ validate the grid shape and transform to a length-3 tuple of ints """ if len(shape) != 3: raise ValueError( f"invalid grid shape {shape}\n" f"grid shape should be a 3D tuple containing floats or ints" ) x, y, z = shape x = self._handle_distance(x) y = self._handle_distance(y) z = self._handle_distance(z) return x, y, z def _handle_slice(self, s: slice) -> slice: """ validate the slice and transform possibly float values to ints """ start = ( s.start if not isinstance(s.start, float) else self._handle_distance(s.start) ) stop = ( s.stop if not isinstance(s.stop, float) else self._handle_distance(s.stop) ) step = ( s.step if not isinstance(s.step, float) else self._handle_distance(s.step) ) return slice(start, stop, step) def _handle_single_key(self, key): """ transform a single index key to a slice or list """ try: len(key) return [self._handle_distance(k) for k in key] except TypeError: if isinstance(key, slice): return self._handle_slice(key) else: return [self._handle_distance(key)] return key @property def x(self) -> int: """ get the number of grid cells in the x-direction """ return self.Nx * self.grid_spacing @property def y(self) -> int: """ get the number of grid cells in the y-direction """ return self.Ny * self.grid_spacing @property def z(self) -> int: """ get the number of grid cells in the y-direction """ return self.Nz * self.grid_spacing @property def shape(self) -> Tuple[int, int, int]: """ get the shape of the FDTD grid """ return (self.Nx, self.Ny, self.Nz) @property def time_passed(self) -> float: """ get the total time passed """ return self.time_steps_passed * self.time_step
[docs] def run(self, total_time: Number, progress_bar: bool = True): """run an FDTD simulation. Args: total_time: the total time for the simulation to run. progress_bar: choose to show a progress bar during simulation """ if isinstance(total_time, float): total_time /= self.time_step time = range(0, int(total_time), 1) if progress_bar: time = tqdm(time) for _ in time: self.step()
[docs] def step(self): """do a single FDTD step by first updating the electric field and then updating the magnetic field """ self.update_E() self.update_H() self.time_steps_passed += 1
[docs] def update_E(self): """ update the electric field by using the curl of the magnetic field """ # update boundaries: step 1 for boundary in self.boundaries: boundary.update_phi_E() curl = curl_H(self.H) self.E += self.courant_number * self.inverse_permittivity * curl # update objects for obj in self.objects: obj.update_E(curl) # update boundaries: step 2 for boundary in self.boundaries: boundary.update_E() # add sources to grid: for src in self.sources: src.update_E() # detect electric field for det in self.detectors: det.detect_E()
[docs] def update_H(self): """ update the magnetic field by using the curl of the electric field """ # update boundaries: step 1 for boundary in self.boundaries: boundary.update_phi_H() curl = curl_E(self.E) self.H -= self.courant_number * self.inverse_permeability * curl # update objects for obj in self.objects: obj.update_H(curl) # update boundaries: step 2 for boundary in self.boundaries: boundary.update_H() # add sources to grid: for src in self.sources: src.update_H() # detect electric field for det in self.detectors: det.detect_H()
[docs] def reset(self): """ reset the grid by setting all fields to zero """ self.H *= 0.0 self.E *= 0.0 self.time_steps_passed *= 0
[docs] def add_source(self, name, source): """ add a source to the grid """ source._register_grid(self) self.sources[name] = source
[docs] def add_boundary(self, name, boundary): """ add a boundary to the grid """ boundary._register_grid(self) self.boundaries[name] = boundary
[docs] def add_detector(self, name, detector): """ add a detector to the grid """ detector._register_grid(self) self.detectors[name] = detector
[docs] def add_object(self, name, obj): """ add an object to the grid """ obj._register_grid(self) self.objects[name] = obj
def __setitem__(self, key, attr): if not isinstance(key, tuple): x, y, z = key, slice(None), slice(None) elif len(key) == 1: x, y, z = key[0], slice(None), slice(None) elif len(key) == 2: x, y, z = key[0], key[1], slice(None) elif len(key) == 3: x, y, z = key else: raise KeyError("maximum number of indices for the grid is 3") attr._register_grid( grid=self, x=self._handle_single_key(x), y=self._handle_single_key(y), z=self._handle_single_key(z), ) def __repr__(self): return ( f"{self.__class__.__name__}(shape=({self.Nx},{self.Ny},{self.Nz}), " f"grid_spacing={self.grid_spacing:.2e}, courant_number={self.courant_number:.2f})" ) def __str__(self): """string representation of the grid lists all the components and their locations in the grid. """ s = repr(self) + "\n" if self.sources: s = s + "\nsources:\n" for src in self.sources: s += str(src) if self.detectors: s = s + "\ndetectors:\n" for det in self.detectors: s += str(det) if self.boundaries: s = s + "\nboundaries:\n" for bnd in self.boundaries: s += str(bnd) if self.objects: s = s + "\nobjects:\n" for obj in self.objects: s += str(obj) return s
[docs] def save_simulation(self, sim_name=None): """ Creates a folder and initializes environment to store simulation or related details. saveSimulation() needs to be run before running any function that stores data (generate_video(), save_data()). Parameters:- (optional) sim_name (string): Preferred name for simulation """ makedirs("fdtd_output", exist_ok=True) # Output master folder declaration # making full_sim_name with timestamp full_sim_name = ( str( + "-" + str( + "-" + str( + "-" + str( + "-" + str( + "-" + str( ) # Simulation name (optional) if sim_name is not None: full_sim_name = full_sim_name + " (" + sim_name + ")" folder = "fdtd_output_" + full_sim_name # storing folder path for saving simulation self.folder = os.path.abspath(path.join("fdtd_output", folder)) # storing timestamp title for self.generate_video self.full_sim_name = full_sim_name makedirs(self.folder, exist_ok=True) return self.folder
[docs] def generate_video(self, delete_frames=False): """Compiles frames into a video These framed should be saved through ``fdtd.Grid.visualize(save=True)`` while having ``fdtd.Grid.save_simulation()`` enabled. Args: delete_frames (optional, bool): delete stored frames after conversion to video. Returns: the filename of the generated video. Note: this function requires ``ffmpeg`` to be available in your path. """ if self.folder is None: raise Exception( "Save location not initialized. Please read about 'fdtd.Grid.saveSimulation()' or try running 'grid.saveSimulation()'." ) cwd = path.abspath(os.getcwd()) chdir(self.folder) try: check_call( [ "ffmpeg", "-y", "-framerate", "8", "-i", "file%04d.png", "-r", "30", "-pix_fmt", "yuv420p", "fdtd_sim_video_" + self.full_sim_name + ".mp4", ] ) except (FileNotFoundError, CalledProcessError): raise CalledProcessError( "Error when calling ffmpeg. Is ffmpeg installed and available in your path?" ) if delete_frames: # delete frames for file_name in glob("*.png"): remove(file_name) video_path = path.abspath( path.join(self.folder, f"fdtd_sim_video_{self.full_sim_name}.mp4") ) chdir(cwd) return video_path
[docs] def save_data(self): """ Saves readings from all detectors in the grid into a numpy zip file. Each detector is stored in separate arrays. Electric and magnetic field field readings of each detector are also stored separately with suffix " (E)" and " (H)" (Example: ['detector0 (E)', 'detector0 (H)']). Therefore, the numpy zip file contains arrays twice the number of detectors. REQUIRES 'fdtd.Grid.save_simulation()' to be run before this function. Parameters: None """ if self.folder is None: raise Exception( "Save location not initialized. Please read about 'fdtd.Grid.saveSimulation()' or try running 'grid.saveSimulation()'." ) dic = {} for detector in self.detectors: dic[ + " (E)"] = [x for x in detector.detector_values()["E"]] dic[ + " (H)"] = [x for x in detector.detector_values()["H"]] savez(path.join(self.folder, "detector_readings"), **dic)