Source code for at.acceptance.boundary

"""
Functions used to calculate the loss boundary for different grid definitions.
"""

import time
import warnings
from collections import namedtuple
from collections.abc import Sequence
from enum import Enum
from typing import Optional

import numpy as np
from scipy.ndimage import binary_dilation, binary_opening

from at.lattice import AtError, AtWarning, Lattice, Refpts, axisdef

from .floodfill_acceptance import floodfill

__all__ = ["GridMode"]


[docs] class GridMode(Enum): """ Grid definition for 2D acceptance boundary search. """ RADIAL = 0 #: full [:math:`\:r, \theta\:`] grid CARTESIAN = 1 #: full [:math:`\:x, y\:`] grid RECURSIVE = 2 #: radial recursive search FLOODFILL = 3 #: from exterior to stable boundary
def grid_config(axes, amplitudes, npoints, bounds, grid_mode, shift_zero): """ Returns an object that defines the grid configuration """ bounds = np.atleast_2d(bounds) bounds = tuple(map(tuple, bounds)) shape = np.array(npoints, dtype=np.int32) d = { "axes": np.atleast_1d(axes), "axesi": np.atleast_1d(axisdef.axis_(axes, key="index")), "amplitudes": np.atleast_1d(amplitudes), "shape": np.atleast_1d(shape), "bounds": bounds, "mode": grid_mode, "shift_zero": shift_zero, } return namedtuple("config", d.keys())(**d) def grid(grid, offset): """ Returns a grid object """ d = {"grid": np.atleast_2d(grid), "offset": np.atleast_1d(offset)} return namedtuple("grid", d.keys())(**d) def set_ring_orbit(ring, dp, obspt, orbit): """ Returns initial closed orbit and rotated ring """ if obspt is not None: assert np.isscalar(obspt), "Scalar value needed for obspt" ringrot = ring.rotate(obspt) else: ringrot = ring if orbit is None: # need this call on ring to use keep_lattice=True in obs loop orbit0, orbitobs = ring.find_orbit(dp=dp, refpts=obspt) orbit = orbit0 if obspt is None else np.squeeze(orbitobs) return orbit, ringrot def grid_configuration( axes, npoints, amplitudes, grid_mode, bounds=None, shift_zero=1.0e-6 ): """ Return a grid configuration based on user input parameters, the ordering is as follows: CARTESIAN: (x,y), RADIAL/RECURSIVE (r, theta). Scalar inputs can be used for 1D grid """ ndims = len(np.atleast_1d(axes)) if ndims > 2 or ndims == 0: raise AtError("axes can have 1 or 2 element (1D or 2D aperture)") elif ndims == 1 and grid_mode is GridMode.RADIAL: grid_mode = GridMode.CARTESIAN if np.shape(np.atleast_1d(npoints)) != (ndims,): raise AtError("npoints shape should be (len(axes),)") if np.shape(np.atleast_1d(amplitudes)) != (ndims,): raise AtError("amplitudes shape should be (len(axes),)") if np.shape(np.atleast_2d(bounds)) != (ndims, 2) and bounds is not None: raise AtError("bounds shape should be (len(axes),2)") if grid_mode is GridMode.RADIAL or grid_mode is GridMode.RECURSIVE: if bounds is None: bounds = np.array([[0, 1], [np.pi, 0]]) bounds[0][bounds[0] == 0] = 1.0e-6 elif (grid_mode is GridMode.CARTESIAN) or (grid_mode is GridMode.FLOODFILL): if bounds is None: bounds = np.array([[p - 1, 1] for p in range(ndims)]) else: raise AtError("GridMode {0} undefined.".format(grid_mode)) config = grid_config(axes, amplitudes, npoints, bounds, grid_mode, shift_zero) return config def get_parts(config, offset): """ Generate a (6,n) particles array based on the grid configuration and initial offset, returns a grid object containing the grid and the offset from which the array can be reconstructed """ def get_part_grid_uniform(bnd, npt, amp): x = [np.linspace(*b, n) * a for a, b, n in zip(amp, bnd, npt)] try: g1, g2 = np.meshgrid(*x) except ValueError: g1, g2 = np.meshgrid(x, 0.0) return np.array([g1.flatten(), g2.flatten()]) def get_part_grid_radial(bnd, npt, amp): x = [np.linspace(*b, n) for b, n in zip(bnd, npt)] g1, g2 = np.meshgrid(*x) g1r = amp[0] * np.cos(g2) * g1 g2r = amp[1] * np.sin(g2) * g1 return np.array([g1r.flatten(), g2r.flatten()]) pind = config.axesi amp = config.amplitudes npt = config.shape bnd = config.bounds gm = config.mode if gm is GridMode.CARTESIAN: g = get_part_grid_uniform(bnd, npt, amp) elif gm is GridMode.RADIAL: g = get_part_grid_radial(bnd, npt, amp) parts = np.zeros((6, np.prod(npt))) parts[pind, :] = [g[i] for i in range(len(pind))] offset = np.array(offset) + config.shift_zero parts = (parts.T + offset).T return parts, grid(g, offset[pind]) def get_survived(parts, ring, nturns, use_mp, **kwargs): """ Track a grid through the ring and extract survived particles """ _, _, td = ring.track( parts, nturns=nturns, losses=True, use_mp=use_mp, refpts=None, in_place=True, **kwargs, ) return np.invert(td["loss_map"].islost) def get_grid_boundary(mask, grid, config): """ Compute the boundary of survided particles array """ def nearest_order(grid): # keep only max r for each angle angle = np.arctan2(*grid) norm = np.linalg.norm(grid.T, axis=1) val, inv = np.unique(angle, return_inverse=True) gf = np.zeros((2, len(val))) for i, v in enumerate(val): inds = np.where(inv == i)[0] ni = norm[inds] nim = np.where(ni == np.amax(ni))[0] ind = inds[nim][0] gf[:, i] = grid[:, ind] # first sort by angle idx = np.argsort(np.arctan2(*gf)) gf = gf[:, idx] # now sort by closest neighbour on normalized grid x, y = gf[0, :].copy(), gf[1, :].copy() dxmin = min(np.diff(np.unique(x))) dymin = min(np.diff(np.unique(y))) iorder = [0] for i in range(1, len(x)): xnow = x[iorder[-1]] ynow = y[iorder[-1]] dd = np.sqrt(((x - xnow) / dxmin) ** 2 + ((y - ynow) / dymin) ** 2) if i <= 3: ic = [j for j in np.argsort(dd) if j not in iorder] else: direction = np.sign(iorder[-1] - iorder[-2]) ic = [ j for j in np.argsort(dd) if j not in iorder and np.sign(j - iorder[-1]) == direction ] if len(ic) > 0: iorder.append(ic[0]) # finally connect both ends if distance within unit square xnow = x[iorder[-1]] ynow = y[iorder[-1]] xs = x[iorder[0]] ys = y[iorder[0]] dd = np.sqrt(((xs - xnow) / dxmin) ** 2 + ((ys - ynow) / dymin) ** 2) if dd < 1.5: iorder.append(iorder[0]) gf = gf[:, iorder] return gf def search_bnd(ma, sa): bnd = np.zeros((2, 1)) for j, m in enumerate(ma): bnd = sa[:, j] if not m and j > 0: bnd = sa[:, j - 1] break return bnd def grid_boundary(mask, grid, config, nclean=2): cnt = np.flip(config.shape) if len(cnt) == 1: return vector_boundary(mask, grid) mask2d = np.reshape(mask.copy(), cnt) # remove isolated points for i in range(nclean): bnd1 = binary_opening(mask2d, np.ones((2, 1))) bnd2 = binary_opening(mask2d, np.ones((1, 2))) bnd = np.logical_and(bnd1, bnd2) k = np.zeros((3, 3), dtype=int) k[1] = 1 k[:, 1] = 1 bnd = np.logical_and(binary_dilation(bnd == 0, border_value=1), bnd) bnd = grid.grid[:, bnd.reshape(mask.shape)] return nearest_order(bnd) def vector_boundary(mask, grid): g = grid.grid xp, xn = g[0] >= 0, g[0] <= 0 bp = search_bnd(mask[xp], g[:, xp]) bn = search_bnd(np.flip(mask[xn]), np.flip(g[:, xn], axis=1)) return np.squeeze([bn[0], bp[0]]) def radial_boundary(mask, grid): angles = np.round(np.arctan2(*grid.grid), decimals=8) angles, invi = np.unique(angles, return_inverse=True) bnd = np.zeros((2, len(angles))) for i in range(len(angles)): sa = grid.grid[:, invi == i] ma = mask[invi == i] bnd[:, i] = search_bnd(ma, sa) return bnd if not np.any(mask): msg = ( "No particle survived, please check your grid " "or lattice. Acceptance set to [0.0, 0.0]." ) warnings.warn(AtWarning(msg)) cnt = np.flip(config.shape) if len(cnt) == 1: return np.zeros(2) else: return np.zeros((2, 1)) if config.mode is GridMode.RADIAL: return radial_boundary(mask, grid) elif config.mode is GridMode.CARTESIAN: return grid_boundary(mask, grid, config) else: raise AtError("GridMode {0} undefined.".format(grid.mode)) def grid_boundary_search( ring, planes, npoints, amplitudes, nturns=1024, obspt=None, dp=None, offset=None, bounds=None, grid_mode=GridMode.RADIAL, use_mp=False, verbose=True, shift_zero=1.0e-9, **kwargs, ): """ Search for the boundary by tracking a grid """ config = grid_configuration( planes, npoints, amplitudes, grid_mode, bounds=bounds, shift_zero=shift_zero ) if verbose: print("\nRunning grid boundary search:") if len(obspt) == 1: if obspt[0] is None: print("Element {0}, obspt={1}".format(ring[0].FamName, 0)) else: print("Element {0}, obspt={1}".format(ring[obspt[0]].FamName, obspt)) else: print( "{0} Elements from {1}, obspt={2} to {3}, obspt={4}".format( len(obspt), ring[obspt[0]].FamName, obspt[0], ring[obspt[-1]].FamName, obspt[-1], ) ) print("The grid mode is {0}".format(config.mode)) print("The planes are {0}".format(config.planes)) print("Number of steps are {0}".format(config.shape)) print("The maximum amplitudes are {0}".format(config.amplitudes)) print("The maximum boundaries are {0}".format(config.bounds)) t0 = time.time() allparts = [] grids = [] if offset is None: offset = [None for _ in np.arange(len(obspt))] if grid_mode is GridMode.FLOODFILL: s_ffallrefpts = [] g_ffallrefpts = [] for obs, orbit in zip(obspt, offset, strict=True): # set ring and offsets _obs = 0 if obs is None else obs _dpp = 0.0 if dp is None else dp _orbit, _ringrot = set_ring_orbit(ring, _dpp, _obs, orbit) _offset = _orbit + np.array([0, 0, 0, 0, _dpp, 0]) # flood fill data_ff = floodfill( _ringrot, nturns=nturns, planes=config.axes, amplitudes=config.amplitudes, bounds=config.bounds, npoints=config.shape, offset=_offset, verbose=False, use_mp=use_mp, pool_size=kwargs["pool_size"], ) mask_alive = data_ff[3, :] == 0 s_ffallrefpts.append(data_ff[0:2, mask_alive]) g_ffallrefpts.append(data_ff[0:2, :]) if len(obspt) == 1: s_ff = s_ffallrefpts[0] g_ff = g_ffallrefpts[0] else: s_ff = s_ffallrefpts g_ff = g_ffallrefpts # floodfill boundary and survived are the same, # but they need to be sorted _ia = np.argsort(np.arctan2(s_ff[1, :], s_ff[0, :])) b_ff = s_ff[:, _ia] # we return boundary, survived, tracked return b_ff, s_ff, g_ff for i, obs, orbit in zip(np.arange(len(obspt)), obspt, offset): orbit, _ = set_ring_orbit(ring, dp, obs, orbit) parts, grid = get_parts(config, orbit) obs = 0 if obs is None else obs dpp = 0.0 if dp is None else dp if verbose: print( "\r{4}/{5}: Projecting obs=({0}, {1}) to the start of the ring, " "the initial offset is {2} with dp={3}".format( ring[obs].FamName, obs, orbit, dpp, i + 1, len(obspt) ) ) ring.track( parts, use_mp=use_mp, in_place=True, refpts=None, start_elem=obs, keep_lattice=True, **kwargs, ) allparts.append(parts) grids.append(grid) if verbose: print("Starting the multi-turn tracking...") allparts = np.concatenate(allparts, axis=1) mask = get_survived(allparts, ring, nturns, use_mp, **kwargs) mask = np.split(mask, len(grids)) survived = [g.grid[:, m] for g, m in zip(grids, mask)] boundary = [get_grid_boundary(m, g, config) for g, m in zip(grids, mask)] grids = [g.grid for g in grids] if verbose: print("Calculation took {0}".format(time.time() - t0)) if len(obspt) == 1: return boundary[0], survived[0], grids[0] else: return boundary, survived, grids def recursive_boundary_search( ring, planes, npoints, amplitudes, nturns=1024, obspt=None, dp=None, offset=None, bounds=None, use_mp=False, divider=2, verbose=True, shift_zero=1.0e-9, **kwargs, ): """ Recursively search for the boundary in a given axis and direction (angle) """ def search_boundary(axesi, angles, rtol, rsteps, nturns, offset, use_mp, **kwargs): ftol = min(rtol / rsteps) cs = np.squeeze([np.cos(angles), np.sin(angles)]) cs = np.around(cs, decimals=9) fact = np.ones(len(angles)) survived = np.full(len(angles), True) part = np.zeros((6, len(angles))) grid = np.array([]) mask = np.array([]) while np.any(survived): for i, pi in enumerate(axesi): part[pi, survived] += cs[i, survived] * rsteps[i] * fact[survived] istracked = np.array( [ not np.any([np.allclose(p, g, rtol=1.0e-9) for g in grid.T]) for p in part[axesi].T ] ) survived = np.array( [ np.any([np.allclose(p, m, rtol=1.0e-9) for m in mask.T]) for p in part[axesi].T ] ) pt = part[:, istracked] grid = np.hstack([grid, pt[axesi]]) if grid.size else pt[axesi] ptmp = (pt.T + offset).T survived[istracked] = get_survived(ptmp, newring, nturns, use_mp, **kwargs) pm = part[:, np.logical_and(istracked, survived)] mask = np.hstack([mask, pm[axesi]]) if mask.size else pm[axesi] for i in range(len(angles)): if not survived[i] and fact[i] > ftol: deltas = cs[:, i] * rsteps[:] * min(1, 2 * fact[i]) if np.any(abs(deltas) > abs(part[axesi, i])): part[axesi, i] = np.zeros(len(axesi)) else: for j, pi in enumerate(axesi): part[pi, i] -= deltas[j] survived[i] = True fact[i] *= 1 / divider for i, pi in enumerate(axesi): part[pi] -= cs[i] * rsteps[i] * fact p = np.squeeze(part[axesi]) return p, mask, grid offset, newring = set_ring_orbit(ring, dp, obspt, offset) config = grid_configuration( planes, npoints, amplitudes, GridMode.RECURSIVE, bounds=bounds, shift_zero=shift_zero, ) rtol = min(np.atleast_1d(config.amplitudes / config.shape)) rstep = config.amplitudes if len(np.atleast_1d(config.shape)) == 2: angles = np.linspace(*config.bounds[1], config.shape[1]) else: angles = np.linspace(*config.bounds[1], 2) angles = np.atleast_1d(angles) if verbose: print("\nRunning recursive boundary search:") if obspt is None: print("Element {0}, obspt={1}".format(ring[0].FamName, 0)) else: print("Element {0}, obspt={1}".format(ring[obspt].FamName, obspt)) print("The grid mode is {0}".format(config.mode)) print("The planes are {0}".format(config.planes)) print( "Number of angles is {0} from {1} to {2} rad".format( len(angles), angles[0], angles[-1] ) ) print("The resolution of the search is {0}".format(rtol)) print("The initial step size is {0}".format(rstep)) print("The initial offset is {0} with dp={1}".format(offset, dp)) t0 = time.time() result = search_boundary( config.axesi, angles, rtol, rstep, nturns, offset, use_mp, **kwargs ) if verbose: print("Calculation took {0}".format(time.time() - t0)) return result def boundary_search( ring: Lattice, planes, npoints, amplitudes, nturns: Optional[int] = 1024, obspt: Optional[Refpts] = None, dp: Optional[float] = None, offset: Sequence[float] = None, bounds=None, grid_mode: Optional[GridMode] = GridMode.RADIAL, use_mp: Optional[bool] = False, verbose: Optional[bool] = True, shift_zero: Optional[float] = 1.0e-9, **kwargs, ): """ Computes the loss boundary at a single point in the machine """ if obspt is not None: rp = ring.uint32_refpts(obspt) else: rp = np.atleast_1d(obspt) if offset is not None: try: offset = np.broadcast_to(offset, (len(rp), 6)) except ValueError: msg = "offset and refpts have incoherent " "shapes: {0}, {1}".format( np.shape(offset), np.shape(obspt) ) raise AtError(msg) divider = kwargs.pop("divider", 2) if grid_mode is GridMode.RECURSIVE: boundary = [] survived = [] grid = [] if offset is None: offset = [None for _ in rp] for r, o in zip(rp, offset): b, s, g = recursive_boundary_search( ring, planes, npoints, amplitudes, nturns=nturns, obspt=r, dp=dp, offset=o, bounds=bounds, use_mp=use_mp, verbose=verbose, divider=divider, shift_zero=shift_zero, **kwargs, ) boundary.append(b) survived.append(s) grid.append(g) if len(rp) == 1: result = (boundary[0], survived[0], grid[0]) else: result = (boundary, survived, grid) else: result = grid_boundary_search( ring, planes, npoints, amplitudes, nturns=nturns, obspt=rp, dp=dp, offset=offset, bounds=bounds, grid_mode=grid_mode, use_mp=use_mp, verbose=verbose, shift_zero=shift_zero, **kwargs, ) return result