"""
Functions used to
calculate the loss boundary for different
grid definitions
"""
from at.lattice import Lattice, AtError
from at.tracking import lattice_pass, patpass
from typing import Optional, Sequence
from enum import Enum
import numpy
from scipy.ndimage import binary_dilation, binary_opening
from collections import namedtuple
import time
__all__ = ['GridMode']
_pdict = {'x': 0, 'xp': 1,
'y': 2, 'yp': 3,
'dp': 4, 'ct': 5}
[docs]class GridMode(Enum):
"""
Grid definition for 2D acceptance boundary search
"""
RADIAL = 0 #: full [:math:`\:r, \theta\:`] grid
CARTESIAN = 1 #: full [:math:`\:x, y\:`] grid
RECURSIVE = 2 #: radial recursive search
def grid_config(planes, amplitudes, npoints, bounds, grid_mode,
shift_zero):
""""
Returns an object that defines the grid configuration
"""
bounds = numpy.atleast_2d(bounds)
bounds = tuple(map(tuple, bounds))
shape = numpy.array(npoints, dtype=numpy.int32)
d = {'planes': numpy.atleast_1d(planes),
'planesi': numpy.atleast_1d(get_plane_index(planes)),
'amplitudes': numpy.atleast_1d(amplitudes),
'shape': numpy.atleast_1d(shape),
'bounds': bounds,
'mode': grid_mode,
'shift_zero': shift_zero}
return namedtuple('config', d.keys())(**d)
def grid(grid, offset):
""""
Returns a grid object
"""
d = {'grid': numpy.atleast_2d(grid),
'offset': numpy.atleast_1d(offset)}
return namedtuple('grid', d.keys())(**d)
def get_plane_index(planes):
"""
Converts plane to particle coordinate index
"""
planesi = numpy.array([], dtype=numpy.int32)
for i, p in enumerate(numpy.atleast_1d(planes)):
if isinstance(p, str):
try:
planesi = numpy.append(planesi, _pdict[p])
except KeyError:
raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct')
else:
raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct')
return planesi
def set_ring_orbit(ring, dp, obspt, orbit):
"""
Returns a ring starting at obspt and initial
closed orbit
"""
if obspt is not None:
assert numpy.isscalar(obspt), 'Scalar value needed for obspt'
ring = ring.rotate(obspt)
if orbit is None:
orbit = ring.find_orbit(dp=dp)[0]
return orbit, ring
def grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=None,
shift_zero=1.0e-6):
"""
Return a grid configuration based on user input parameters, the ordering
is as follows: CARTESIAN: (x,y), RADIAL/RECURSIVE (r, theta).
Scalar inputs can be used for 1D grid
"""
ndims = len(numpy.atleast_1d(planes))
if ndims > 2 or ndims == 0:
raise AtError('planes can have 1 or 2 element (1D or 2D aperture)')
elif ndims == 1 and grid_mode is GridMode.RADIAL:
grid_mode = GridMode.CARTESIAN
if numpy.shape(numpy.atleast_1d(npoints)) != (ndims,):
raise AtError('npoints shape should be (len(planes),)')
if numpy.shape(numpy.atleast_1d(amplitudes)) != (ndims,):
raise AtError('amplitudes shape should be (len(planes),)')
if (numpy.shape(numpy.atleast_2d(bounds)) != (ndims, 2)
and bounds is not None):
raise AtError('bounds shape should be (len(planes),2)')
if grid_mode is GridMode.RADIAL or grid_mode is GridMode.RECURSIVE:
if bounds is None:
bounds = numpy.array([[0, 1], [numpy.pi, 0]])
bounds[0][bounds[0] == 0] = 1.0e-6
elif grid_mode is GridMode.CARTESIAN:
if bounds is None:
bounds = numpy.array([[p-1, 1] for p in range(ndims)])
else:
raise AtError('GridMode {0} undefined.'.format(grid_mode))
config = grid_config(planes, amplitudes, npoints,
bounds, grid_mode, shift_zero)
return config
def get_parts(config, offset):
"""
Generate a (6,n) particles array based on the grid configuration
and initial offset, returns a grid object containing the grid and
the offset from which the array can be reconstructed
"""
def get_part_grid_uniform(bnd, np, amp):
x = [numpy.linspace(*b, n)*a for a, b, n in zip(amp, bnd, np)]
try:
g1, g2 = numpy.meshgrid(*x)
except ValueError:
g1, g2 = numpy.meshgrid(x, 0.0)
return numpy.array([g1.flatten(), g2.flatten()])
def get_part_grid_radial(bnd, np, amp):
x = [numpy.linspace(*b, n) for b, n in zip(bnd, np)]
g1, g2 = numpy.meshgrid(*x)
g1r = amp[0]*numpy.cos(g2)*g1
g2r = amp[1]*numpy.sin(g2)*g1
return numpy.array([g1r.flatten(), g2r.flatten()])
pind = config.planesi
amp = config.amplitudes
np = config.shape
bnd = config.bounds
gm = config.mode
if gm is GridMode.CARTESIAN:
g = get_part_grid_uniform(bnd, np, amp)
elif gm is GridMode.RADIAL:
g = get_part_grid_radial(bnd, np, amp)
parts = numpy.zeros((6, numpy.prod(np)))
parts[pind, :] = [g[i] for i in range(len(pind))]
offset += config.shift_zero
parts = (parts.T+offset).T
return parts, grid(g, offset[pind])
def get_survived(parts, ring, nturns, use_mp, **kwargs):
"""
Track a grid through the ring and extract survived particles
"""
if use_mp:
_ = patpass(ring, parts, nturns=nturns, **kwargs)
else:
_ = lattice_pass(ring, parts, nturns=nturns, **kwargs)
if parts.ndim == 1:
survived = numpy.invert(numpy.isnan(parts[0]))
else:
survived = numpy.invert(numpy.isnan(parts[0, :]))
return survived
def get_grid_boundary(mask, grid, config):
"""
Compute the boundary of survided particles array
"""
def nearest_order(grid):
# keep only max r for each angle
angle = numpy.arctan2(*grid)
norm = numpy.linalg.norm(grid.T, axis=1)
val, inv = numpy.unique(angle, return_inverse=True)
gf = numpy.zeros((2, len(val)))
for i, v in enumerate(val):
inds = numpy.where(inv == i)[0]
ni = norm[inds]
nim = numpy.where(ni == numpy.amax(ni))[0]
ind = inds[nim][0]
gf[:, i] = grid[:, ind]
# first sort by angle
idx = numpy.argsort(numpy.arctan2(*gf))
gf = gf[:, idx]
# now sort by closest neighbour on normalized grid
x, y = gf[0, :].copy(), gf[1, :].copy()
dxmin = min(numpy.diff(numpy.unique(x)))
dymin = min(numpy.diff(numpy.unique(y)))
iorder = [0]
for i in range(1, len(x)):
xnow = x[iorder[-1]]
ynow = y[iorder[-1]]
dd = numpy.sqrt(((x-xnow)/dxmin)**2+((y-ynow)/dymin)**2)
if i <= 3:
ic = [j for j in numpy.argsort(dd) if j not in iorder]
else:
direction = numpy.sign(iorder[-1]-iorder[-2])
ic = [j for j in numpy.argsort(dd) if j not in iorder
and numpy.sign(j-iorder[-1]) == direction]
if len(ic) > 0:
iorder.append(ic[0])
# finally connect both ends if distance within unit square
xnow = x[iorder[-1]]
ynow = y[iorder[-1]]
xs = x[iorder[0]]
ys = y[iorder[0]]
dd = numpy.sqrt(((xs-xnow)/dxmin)**2+((ys-ynow)/dymin)**2)
if dd < 1.5:
iorder.append(iorder[0])
gf = gf[:, iorder]
return gf
def search_bnd(ma, sa):
bnd = numpy.zeros((2, 1))
for j, m in enumerate(ma):
bnd = sa[:, j]
if not m and j > 0:
bnd = sa[:, j-1]
break
return bnd
def grid_boundary(mask, grid, config, nclean=2):
cnt = numpy.flip(config.shape)
if len(cnt) == 1:
return vector_boundary(mask, grid)
mask2d = numpy.reshape(mask.copy(), cnt)
# remove isolated points
for i in range(nclean):
bnd1 = binary_opening(mask2d, numpy.ones((2, 1)))
bnd2 = binary_opening(mask2d, numpy.ones((1, 2)))
bnd = numpy.logical_and(bnd1, bnd2)
k = numpy.zeros((3, 3), dtype=int)
k[1] = 1
k[:, 1] = 1
bnd = numpy.logical_and(binary_dilation(bnd == 0, border_value=1), bnd)
bnd = grid.grid[:, bnd.reshape(mask.shape)]
return nearest_order(bnd)
def vector_boundary(mask, grid):
g = grid.grid
xp, xn = g[0] >= 0, g[0] <= 0
bp = search_bnd(mask[xp], g[:, xp])
bn = search_bnd(numpy.flip(mask[xn]),
numpy.flip(g[:, xn], axis=1))
return numpy.squeeze([bn[0], bp[0]])
def radial_boundary(mask, grid):
angles = numpy.round(numpy.arctan2(*grid.grid), decimals=8)
angles, invi = numpy.unique(angles, return_inverse=True)
bnd = numpy.zeros((2, len(angles)))
for i in range(len(angles)):
sa = grid.grid[:, invi == i]
ma = mask[invi == i]
bnd[:, i] = search_bnd(ma, sa)
return bnd
if not numpy.any(mask):
raise AtError("No particle survived, please check your grid "
"or lattice.")
if config.mode is GridMode.RADIAL:
return radial_boundary(mask, grid)
elif config.mode is GridMode.CARTESIAN:
return grid_boundary(mask, grid, config)
else:
raise AtError('GridMode {0} undefined.'.format(grid.mode))
def grid_boundary_search(ring, planes, npoints, amplitudes, nturns=1024,
obspt=None, dp=None, offset=None, bounds=None,
grid_mode=GridMode.RADIAL, use_mp=False,
verbose=True, shift_zero=1.0e-9, **kwargs):
"""
Search for the boundary by tracking a grid
"""
offset, newring = set_ring_orbit(ring, dp, obspt,
offset)
config = grid_configuration(planes, npoints, amplitudes,
grid_mode, bounds=bounds,
shift_zero=shift_zero)
if verbose:
print('\nRunning grid boundary search:')
if obspt is None:
print('Element {0}, obspt={1}'.format(ring[0].FamName, 0))
else:
print('Element {0}, obspt={1}'.format(ring[obspt].FamName,
obspt))
print('The grid mode is {0}'.format(config.mode))
print('The planes are {0}'.format(config.planes))
print('Number of steps are {0}'.format(config.shape))
print('The maximum amplitudes are {0}'.format(config.amplitudes))
print('The maximum boundaries are {0}'.format(config.bounds))
print('The initial offset is {0} with dp={1}'.format(offset, dp))
t0 = time.time()
parts, grid = get_parts(config, offset)
mask = get_survived(parts, newring, nturns, use_mp, **kwargs)
survived = grid.grid[:, mask]
boundary = get_grid_boundary(mask, grid, config)
if verbose:
print('Calculation took {0}'.format(time.time()-t0))
return boundary, survived, grid.grid
def recursive_boundary_search(ring, planes, npoints, amplitudes, nturns=1024,
obspt=None, dp=None, offset=None, bounds=None,
use_mp=False, divider=2, verbose=True,
shift_zero=1.0e-9, **kwargs):
"""
Recursively search for the boundary in a given plane and direction (angle)
"""
def search_boundary(planesi, angles, rtol, rsteps, nturns,
offset, use_mp, **kwargs):
ftol = min(rtol/rsteps)
cs = numpy.squeeze([numpy.cos(angles), numpy.sin(angles)])
cs = numpy.around(cs, decimals=9)
fact = numpy.ones(len(angles))
survived = numpy.full(len(angles), True)
istracked = numpy.full(len(angles), True)
part = numpy.zeros((6, len(angles)))
grid = numpy.array([])
mask = numpy.array([])
while numpy.any(survived):
for i, pi in enumerate(planesi):
part[pi, survived] += cs[i, survived]*rsteps[i]*fact[survived]
istracked = numpy.array([not numpy.any([numpy.allclose(p, g,
rtol=1.0e-9)
for g in grid.T])
for p in part[planesi].T])
survived = numpy.array([numpy.any([numpy.allclose(p, m,
rtol=1.0e-9)
for m in mask.T])
for p in part[planesi].T])
pt = part[:, istracked]
grid = numpy.hstack([grid,
pt[planesi]]) if grid.size else pt[planesi]
ptmp = (pt.T + offset).T
survived[istracked] = get_survived(ptmp, newring, nturns,
use_mp, **kwargs)
pm = part[:, numpy.logical_and(istracked, survived)]
mask = numpy.hstack([mask,
pm[planesi]]) if mask.size else pm[planesi]
for i in range(len(angles)):
if not survived[i] and fact[i] > ftol:
deltas = cs[:, i]*rsteps[:]*min(1, 2*fact[i])
if numpy.any(abs(deltas) > abs(part[planesi, i])):
part[planesi, i] = numpy.zeros(len(planesi))
else:
for j, pi in enumerate(planesi):
part[pi, i] -= deltas[j]
survived[i] = True
fact[i] *= 1/divider
for i, pi in enumerate(planesi):
part[pi] -= cs[i]*rsteps[i]*fact
p = numpy.squeeze(part[planesi])
return p, mask, grid
offset, newring = set_ring_orbit(ring, dp, obspt, offset)
config = grid_configuration(planes, npoints, amplitudes,
GridMode.RECURSIVE, bounds=bounds,
shift_zero=shift_zero)
rtol = min(numpy.atleast_1d(config.amplitudes/config.shape))
rstep = config.amplitudes
if len(numpy.atleast_1d(config.shape)) == 2:
angles = numpy.linspace(*config.bounds[1], config.shape[1])
else:
angles = numpy.linspace(*config.bounds[1], 2)
angles = numpy.atleast_1d(angles)
if verbose:
print('\nRunning recursive boundary search:')
if obspt is None:
print('Element {0}, obspt={1}'.format(ring[0].FamName, 0))
else:
print('Element {0}, obspt={1}'.format(ring[obspt].FamName,
obspt))
print('The grid mode is {0}'.format(config.mode))
print('The planes are {0}'.format(config.planes))
print('Number of angles is {0} from {1} to {2} rad'.format(len(angles),
angles[0], angles[-1]))
print('The resolution of the search is {0}'.format(rtol))
print('The initial step size is {0}'.format(rstep))
print('The initial offset is {0} with dp={1}'.format(offset, dp))
t0 = time.time()
result = search_boundary(config.planesi, angles, rtol, rstep,
nturns, offset, use_mp, **kwargs)
if verbose:
print('Calculation took {0}'.format(time.time()-t0))
return result
def boundary_search(ring: Lattice, planes, npoints, amplitudes,
nturns: Optional[int] = 1024,
obspt: Optional[int] = None, dp: Optional[float] = None,
offset: Sequence[float] = None, bounds=None,
grid_mode: Optional[GridMode] = GridMode.RADIAL,
use_mp: Optional[bool] = False,
verbose: Optional[bool] = True,
shift_zero: Optional[float] = 1.0e-9,
**kwargs):
"""
Computes the loss boundary at a single point in the machine
"""
divider = kwargs.pop('divider', 2)
if grid_mode is GridMode.RECURSIVE:
result = recursive_boundary_search(ring, planes, npoints, amplitudes,
nturns=nturns, obspt=obspt, dp=dp,
offset=offset, bounds=bounds,
use_mp=use_mp, verbose=verbose,
divider=divider,
shift_zero=shift_zero,
**kwargs)
else:
result = grid_boundary_search(ring, planes, npoints, amplitudes,
nturns=nturns, obspt=obspt, dp=dp,
offset=offset, bounds=bounds,
grid_mode=grid_mode, use_mp=use_mp,
verbose=verbose, shift_zero=shift_zero,
**kwargs)
return result