Source code for at.acceptance.boundary

"""
Functions used to
calculate the loss boundary for different
grid definitions
"""

from at.lattice import Lattice, AtError
from at.tracking import lattice_pass, patpass
from typing import Optional, Sequence
from enum import Enum
import numpy
from scipy.ndimage import binary_dilation, binary_opening
from collections import namedtuple
import time

__all__ = ['GridMode']

_pdict = {'x': 0, 'xp': 1,
          'y': 2, 'yp': 3,
          'dp': 4, 'ct': 5}


[docs]class GridMode(Enum): """ Grid definition for 2D acceptance boundary search """ RADIAL = 0 #: full [:math:`\:r, \theta\:`] grid CARTESIAN = 1 #: full [:math:`\:x, y\:`] grid RECURSIVE = 2 #: radial recursive search
def grid_config(planes, amplitudes, npoints, bounds, grid_mode, shift_zero): """" Returns an object that defines the grid configuration """ bounds = numpy.atleast_2d(bounds) bounds = tuple(map(tuple, bounds)) shape = numpy.array(npoints, dtype=numpy.int32) d = {'planes': numpy.atleast_1d(planes), 'planesi': numpy.atleast_1d(get_plane_index(planes)), 'amplitudes': numpy.atleast_1d(amplitudes), 'shape': numpy.atleast_1d(shape), 'bounds': bounds, 'mode': grid_mode, 'shift_zero': shift_zero} return namedtuple('config', d.keys())(**d) def grid(grid, offset): """" Returns a grid object """ d = {'grid': numpy.atleast_2d(grid), 'offset': numpy.atleast_1d(offset)} return namedtuple('grid', d.keys())(**d) def get_plane_index(planes): """ Converts plane to particle coordinate index """ planesi = numpy.array([], dtype=numpy.int32) for i, p in enumerate(numpy.atleast_1d(planes)): if isinstance(p, str): try: planesi = numpy.append(planesi, _pdict[p]) except KeyError: raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct') else: raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct') return planesi def set_ring_orbit(ring, dp, obspt, orbit): """ Returns a ring starting at obspt and initial closed orbit """ if obspt is not None: assert numpy.isscalar(obspt), 'Scalar value needed for obspt' ring = ring.rotate(obspt) if orbit is None: orbit = ring.find_orbit(dp=dp)[0] return orbit, ring def grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=None, shift_zero=1.0e-6): """ Return a grid configuration based on user input parameters, the ordering is as follows: CARTESIAN: (x,y), RADIAL/RECURSIVE (r, theta). Scalar inputs can be used for 1D grid """ ndims = len(numpy.atleast_1d(planes)) if ndims > 2 or ndims == 0: raise AtError('planes can have 1 or 2 element (1D or 2D aperture)') elif ndims == 1 and grid_mode is GridMode.RADIAL: grid_mode = GridMode.CARTESIAN if numpy.shape(numpy.atleast_1d(npoints)) != (ndims,): raise AtError('npoints shape should be (len(planes),)') if numpy.shape(numpy.atleast_1d(amplitudes)) != (ndims,): raise AtError('amplitudes shape should be (len(planes),)') if (numpy.shape(numpy.atleast_2d(bounds)) != (ndims, 2) and bounds is not None): raise AtError('bounds shape should be (len(planes),2)') if grid_mode is GridMode.RADIAL or grid_mode is GridMode.RECURSIVE: if bounds is None: bounds = numpy.array([[0, 1], [numpy.pi, 0]]) bounds[0][bounds[0] == 0] = 1.0e-6 elif grid_mode is GridMode.CARTESIAN: if bounds is None: bounds = numpy.array([[p-1, 1] for p in range(ndims)]) else: raise AtError('GridMode {0} undefined.'.format(grid_mode)) config = grid_config(planes, amplitudes, npoints, bounds, grid_mode, shift_zero) return config def get_parts(config, offset): """ Generate a (6,n) particles array based on the grid configuration and initial offset, returns a grid object containing the grid and the offset from which the array can be reconstructed """ def get_part_grid_uniform(bnd, np, amp): x = [numpy.linspace(*b, n)*a for a, b, n in zip(amp, bnd, np)] try: g1, g2 = numpy.meshgrid(*x) except ValueError: g1, g2 = numpy.meshgrid(x, 0.0) return numpy.array([g1.flatten(), g2.flatten()]) def get_part_grid_radial(bnd, np, amp): x = [numpy.linspace(*b, n) for b, n in zip(bnd, np)] g1, g2 = numpy.meshgrid(*x) g1r = amp[0]*numpy.cos(g2)*g1 g2r = amp[1]*numpy.sin(g2)*g1 return numpy.array([g1r.flatten(), g2r.flatten()]) pind = config.planesi amp = config.amplitudes np = config.shape bnd = config.bounds gm = config.mode if gm is GridMode.CARTESIAN: g = get_part_grid_uniform(bnd, np, amp) elif gm is GridMode.RADIAL: g = get_part_grid_radial(bnd, np, amp) parts = numpy.zeros((6, numpy.prod(np))) parts[pind, :] = [g[i] for i in range(len(pind))] offset += config.shift_zero parts = (parts.T+offset).T return parts, grid(g, offset[pind]) def get_survived(parts, ring, nturns, use_mp, **kwargs): """ Track a grid through the ring and extract survived particles """ if use_mp: _ = patpass(ring, parts, nturns=nturns, **kwargs) else: _ = lattice_pass(ring, parts, nturns=nturns, **kwargs) if parts.ndim == 1: survived = numpy.invert(numpy.isnan(parts[0])) else: survived = numpy.invert(numpy.isnan(parts[0, :])) return survived def get_grid_boundary(mask, grid, config): """ Compute the boundary of survided particles array """ def nearest_order(grid): # keep only max r for each angle angle = numpy.arctan2(*grid) norm = numpy.linalg.norm(grid.T, axis=1) val, inv = numpy.unique(angle, return_inverse=True) gf = numpy.zeros((2, len(val))) for i, v in enumerate(val): inds = numpy.where(inv == i)[0] ni = norm[inds] nim = numpy.where(ni == numpy.amax(ni))[0] ind = inds[nim][0] gf[:, i] = grid[:, ind] # first sort by angle idx = numpy.argsort(numpy.arctan2(*gf)) gf = gf[:, idx] # now sort by closest neighbour on normalized grid x, y = gf[0, :].copy(), gf[1, :].copy() dxmin = min(numpy.diff(numpy.unique(x))) dymin = min(numpy.diff(numpy.unique(y))) iorder = [0] for i in range(1, len(x)): xnow = x[iorder[-1]] ynow = y[iorder[-1]] dd = numpy.sqrt(((x-xnow)/dxmin)**2+((y-ynow)/dymin)**2) if i <= 3: ic = [j for j in numpy.argsort(dd) if j not in iorder] else: direction = numpy.sign(iorder[-1]-iorder[-2]) ic = [j for j in numpy.argsort(dd) if j not in iorder and numpy.sign(j-iorder[-1]) == direction] if len(ic) > 0: iorder.append(ic[0]) # finally connect both ends if distance within unit square xnow = x[iorder[-1]] ynow = y[iorder[-1]] xs = x[iorder[0]] ys = y[iorder[0]] dd = numpy.sqrt(((xs-xnow)/dxmin)**2+((ys-ynow)/dymin)**2) if dd < 1.5: iorder.append(iorder[0]) gf = gf[:, iorder] return gf def search_bnd(ma, sa): bnd = numpy.zeros((2, 1)) for j, m in enumerate(ma): bnd = sa[:, j] if not m and j > 0: bnd = sa[:, j-1] break return bnd def grid_boundary(mask, grid, config, nclean=2): cnt = numpy.flip(config.shape) if len(cnt) == 1: return vector_boundary(mask, grid) mask2d = numpy.reshape(mask.copy(), cnt) # remove isolated points for i in range(nclean): bnd1 = binary_opening(mask2d, numpy.ones((2, 1))) bnd2 = binary_opening(mask2d, numpy.ones((1, 2))) bnd = numpy.logical_and(bnd1, bnd2) k = numpy.zeros((3, 3), dtype=int) k[1] = 1 k[:, 1] = 1 bnd = numpy.logical_and(binary_dilation(bnd == 0, border_value=1), bnd) bnd = grid.grid[:, bnd.reshape(mask.shape)] return nearest_order(bnd) def vector_boundary(mask, grid): g = grid.grid xp, xn = g[0] >= 0, g[0] <= 0 bp = search_bnd(mask[xp], g[:, xp]) bn = search_bnd(numpy.flip(mask[xn]), numpy.flip(g[:, xn], axis=1)) return numpy.squeeze([bn[0], bp[0]]) def radial_boundary(mask, grid): angles = numpy.round(numpy.arctan2(*grid.grid), decimals=8) angles, invi = numpy.unique(angles, return_inverse=True) bnd = numpy.zeros((2, len(angles))) for i in range(len(angles)): sa = grid.grid[:, invi == i] ma = mask[invi == i] bnd[:, i] = search_bnd(ma, sa) return bnd if not numpy.any(mask): raise AtError("No particle survived, please check your grid " "or lattice.") if config.mode is GridMode.RADIAL: return radial_boundary(mask, grid) elif config.mode is GridMode.CARTESIAN: return grid_boundary(mask, grid, config) else: raise AtError('GridMode {0} undefined.'.format(grid.mode)) def grid_boundary_search(ring, planes, npoints, amplitudes, nturns=1024, obspt=None, dp=None, offset=None, bounds=None, grid_mode=GridMode.RADIAL, use_mp=False, verbose=True, shift_zero=1.0e-9, **kwargs): """ Search for the boundary by tracking a grid """ offset, newring = set_ring_orbit(ring, dp, obspt, offset) config = grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=bounds, shift_zero=shift_zero) if verbose: print('\nRunning grid boundary search:') if obspt is None: print('Element {0}, obspt={1}'.format(ring[0].FamName, 0)) else: print('Element {0}, obspt={1}'.format(ring[obspt].FamName, obspt)) print('The grid mode is {0}'.format(config.mode)) print('The planes are {0}'.format(config.planes)) print('Number of steps are {0}'.format(config.shape)) print('The maximum amplitudes are {0}'.format(config.amplitudes)) print('The maximum boundaries are {0}'.format(config.bounds)) print('The initial offset is {0} with dp={1}'.format(offset, dp)) t0 = time.time() parts, grid = get_parts(config, offset) mask = get_survived(parts, newring, nturns, use_mp, **kwargs) survived = grid.grid[:, mask] boundary = get_grid_boundary(mask, grid, config) if verbose: print('Calculation took {0}'.format(time.time()-t0)) return boundary, survived, grid.grid def recursive_boundary_search(ring, planes, npoints, amplitudes, nturns=1024, obspt=None, dp=None, offset=None, bounds=None, use_mp=False, divider=2, verbose=True, shift_zero=1.0e-9, **kwargs): """ Recursively search for the boundary in a given plane and direction (angle) """ def search_boundary(planesi, angles, rtol, rsteps, nturns, offset, use_mp, **kwargs): ftol = min(rtol/rsteps) cs = numpy.squeeze([numpy.cos(angles), numpy.sin(angles)]) cs = numpy.around(cs, decimals=9) fact = numpy.ones(len(angles)) survived = numpy.full(len(angles), True) istracked = numpy.full(len(angles), True) part = numpy.zeros((6, len(angles))) grid = numpy.array([]) mask = numpy.array([]) while numpy.any(survived): for i, pi in enumerate(planesi): part[pi, survived] += cs[i, survived]*rsteps[i]*fact[survived] istracked = numpy.array([not numpy.any([numpy.allclose(p, g, rtol=1.0e-9) for g in grid.T]) for p in part[planesi].T]) survived = numpy.array([numpy.any([numpy.allclose(p, m, rtol=1.0e-9) for m in mask.T]) for p in part[planesi].T]) pt = part[:, istracked] grid = numpy.hstack([grid, pt[planesi]]) if grid.size else pt[planesi] ptmp = (pt.T + offset).T survived[istracked] = get_survived(ptmp, newring, nturns, use_mp, **kwargs) pm = part[:, numpy.logical_and(istracked, survived)] mask = numpy.hstack([mask, pm[planesi]]) if mask.size else pm[planesi] for i in range(len(angles)): if not survived[i] and fact[i] > ftol: deltas = cs[:, i]*rsteps[:]*min(1, 2*fact[i]) if numpy.any(abs(deltas) > abs(part[planesi, i])): part[planesi, i] = numpy.zeros(len(planesi)) else: for j, pi in enumerate(planesi): part[pi, i] -= deltas[j] survived[i] = True fact[i] *= 1/divider for i, pi in enumerate(planesi): part[pi] -= cs[i]*rsteps[i]*fact p = numpy.squeeze(part[planesi]) return p, mask, grid offset, newring = set_ring_orbit(ring, dp, obspt, offset) config = grid_configuration(planes, npoints, amplitudes, GridMode.RECURSIVE, bounds=bounds, shift_zero=shift_zero) rtol = min(numpy.atleast_1d(config.amplitudes/config.shape)) rstep = config.amplitudes if len(numpy.atleast_1d(config.shape)) == 2: angles = numpy.linspace(*config.bounds[1], config.shape[1]) else: angles = numpy.linspace(*config.bounds[1], 2) angles = numpy.atleast_1d(angles) if verbose: print('\nRunning recursive boundary search:') if obspt is None: print('Element {0}, obspt={1}'.format(ring[0].FamName, 0)) else: print('Element {0}, obspt={1}'.format(ring[obspt].FamName, obspt)) print('The grid mode is {0}'.format(config.mode)) print('The planes are {0}'.format(config.planes)) print('Number of angles is {0} from {1} to {2} rad'.format(len(angles), angles[0], angles[-1])) print('The resolution of the search is {0}'.format(rtol)) print('The initial step size is {0}'.format(rstep)) print('The initial offset is {0} with dp={1}'.format(offset, dp)) t0 = time.time() result = search_boundary(config.planesi, angles, rtol, rstep, nturns, offset, use_mp, **kwargs) if verbose: print('Calculation took {0}'.format(time.time()-t0)) return result def boundary_search(ring: Lattice, planes, npoints, amplitudes, nturns: Optional[int] = 1024, obspt: Optional[int] = None, dp: Optional[float] = None, offset: Sequence[float] = None, bounds=None, grid_mode: Optional[GridMode] = GridMode.RADIAL, use_mp: Optional[bool] = False, verbose: Optional[bool] = True, shift_zero: Optional[float] = 1.0e-9, **kwargs): """ Computes the loss boundary at a single point in the machine """ divider = kwargs.pop('divider', 2) if grid_mode is GridMode.RECURSIVE: result = recursive_boundary_search(ring, planes, npoints, amplitudes, nturns=nturns, obspt=obspt, dp=dp, offset=offset, bounds=bounds, use_mp=use_mp, verbose=verbose, divider=divider, shift_zero=shift_zero, **kwargs) else: result = grid_boundary_search(ring, planes, npoints, amplitudes, nturns=nturns, obspt=obspt, dp=dp, offset=offset, bounds=bounds, grid_mode=grid_mode, use_mp=use_mp, verbose=verbose, shift_zero=shift_zero, **kwargs) return result