Source code for at.acceptance.floodfill_acceptance

"""This is an implementation of the Flood Fill algorithm for pyat."""

from multiprocessing import Manager, Process, Queue

import numpy as np

import at
from at.lattice import Lattice, axisdef
from at.tracking.track import MPMode

# Author : E. Serra,  UAB and ALBA,  2025 original version in python
#                                    See. IPAC2025, MOPB065.
#                                    APPLICATION OF FAST ALGORITHMS TO
#                                    CALCULATE DYNAMIC AND
#                                    MOMENTUM APERTURE TO THE DESIGN OF
#                                    ALBA II.
#                                    JACoW-IPAC25-MOPB065
# Edited : O. Blanco, ALBA,          2025 pyat parallel version

__all__ = ["floodfill"]


[docs] def floodfill( ring: Lattice, nturns: int = 1024, planes: list | tuple = ("x", "y"), amplitudes: list | tuple = (10e-3, 10e-3), bounds: list | tuple = ((-1, 1), (0, 1)), npoints: list | tuple = (10, 10), offset: list | np.ndarray | None = None, verbose: bool = False, use_mp: bool | MPMode = True, pool_size: int = 10, ) -> np.ndarray: """Find the 2D acceptance of the ring using Flood Fill [1]_. Flood fill [1]_ tracks particles from the exterior to the border of the acceptance. The lost particles are returned in plost. The not lost particles are returned in pnotlost. Parameters: ring: pyat lattice nturns: Number of turns for the tracking. Default: 1024 planes: max. dimension 2, Plane(s) to scan for the acceptance. Allowed values are: ``'x'``, ``'px'``, ``'y'``, ``'py'``, ``'dp'``, ``'ct'`` amplitudes: (2,) array, set the search range per plane. Default [10e-3,10e-3] bounds: (2,2) array, defines the tracked range: range=bounds*amplitude Default ((-1,1),(0,1)) npoints: Number of steps per axis. Default (10,10). offset: Offset to be added. Default np.zeros((6)). This is useful to study off-axis acceptance on any plane, or off-momentum acceptance by adding dp to the 5th coord., to track particles on the closed orbit, or to add a small deviation to the tracked coordinates, e.g. [10e-5 10e-5] in the transverse planes. verbose: Print extra info. Default 0. use_mp: Parallel tracking, default True. It uses at.MPMode.CPU, not at.MPMode.GPU. pool_size: Number of cpus. Returns: A (4,n) array with the following rows: Position on axis 1 of the n tracked particles Position on axis 2 of the n tracked particles Number of turns the n particle did during tracking Flag set to 1 if the nth particle is lost, otherwise 0 Example: >>> ff_data = floodfill(ring, nturns=500) >>> alive_mask = ff_data[3,:] == 0 .. Note:: This method is recommended for single or low number of CPUs, and does not scale well for parallel computing. References: .. [1] B. Riemann, M. Aiba, J. Kallestrup, and A. Streun, "Efficient algorithms for dynamic aperture and momentum acceptance calculation in synchrotron light sources", Phys. Rev. Accel. Beams, vol. 27, no. 9, p. 094 002, 2024. doi:10.1103/PhysRevAccelBeams.27.094002 """ def track_queue( ring: Lattice, zin: np.ndarray, nturns: int, n_x: int, n_y: int, task_to_accomplish: list, islost: list, final_turn: list, registered_for_tracking: list, ) -> None: """Track particles with index inside queue. Parameters: ring: pyat ring zin: particles coordinates (6,nparticles) nturns: number of tracking turns n_x: horizontal grid size n_y: vertical grid size task_to_accomplish: list with indexes of particles to track islost: list with particle loss result from tracking final_turn: turn when the particle is lost registered_for_tracking: particle tracked or to be tracked """ while not task_to_accomplish.empty(): task_id = task_to_accomplish.get(block=True, timeout=10) registered_for_tracking[task_id] = True *_, losses_data = ring.track(zin[:, task_id], nturns, losses=True) islost[task_id] = losses_data["loss_map"]["islost"][0] final_turn[task_id] = losses_data["loss_map"]["turn"][0] if islost[task_id]: # next particles id_move = np.array( [task_id + 1, task_id - 1, task_id + n_y, task_id - n_y] ) # use only valid index mask = np.logical_and(id_move >= 0, id_move < (n_x * n_y)) newid = id_move[mask] for i in newid: if not registered_for_tracking[i]: registered_for_tracking[i] = True task_to_accomplish.put(i) # Initialize variables if offset is None: offset = 6 * [0] offset = np.array(offset) axesi = np.atleast_1d(axisdef.axis_(tuple(planes), key="index")) # Initialize output in case we return earlier data_tracked = np.zeros((4, 0)) verboseprint = print if verbose else lambda *a, **k: None verboseprint("Flood fill starts.") verboseprint(f"Maximum number of turns: {nturns}") window = np.ravel((np.array(bounds).reshape((2, 2)).T * np.array(amplitudes)).T) if window[0] == window[1] or window[2] == window[3]: verboseprint("Window is too narrow") return data_tracked # set the grid if np.any(np.asarray(npoints) < 1): print("Horizontal and vertical grid size should be more than 1") return data_tracked n_x, n_y = npoints min_x = min(window[0:2]) max_x = max(window[0:2]) min_y = min(window[2:4]) max_y = max(window[2:4]) xvals = np.linspace(min_x, max_x, n_x) yvals = np.linspace(min_y, max_y, n_y) nparticles = n_x * n_y points = np.zeros((2, nparticles)) verboseprint(f"Number of grid points: {nparticles}") # set the order ii_ = 0 for ix_ in range(n_x): for iy_ in range(n_y): points[0, ii_] = xvals[ix_] points[1, ii_] = yvals[iy_] ii_ = ii_ + 1 ndims = 6 particles = np.zeros((nparticles, ndims)) particles = particles + offset particles[:, axesi[0]] = particles[:, axesi[0]] + points[0, :] particles[:, axesi[1]] = particles[:, axesi[1]] + points[1, :] # parallel parameters nproc = pool_size if use_mp else 1 verboseprint(f"Number of processors: {nproc}") task_to_accomplish = Queue() processes = [] leftside = np.arange(1, n_y - 1) rightside = np.arange((n_x - 1) * n_y + 1, n_x * n_y - 1) lowerline = np.asarray([*range(0, n_y * n_x, n_y)]) upperline = lowerline + n_y - 1 manager = Manager() registered_for_tracking = manager.list(nparticles * [False]) for i in np.concatenate((upperline, leftside, lowerline, rightside)): task_to_accomplish.put(i) registered_for_tracking[i] = True verboseprint("Tracking...") # creating processes islost = manager.list(nparticles * [False]) final_turn = manager.list(nparticles * [-1]) for _worker in range(nproc): prc = Process( target=track_queue, args=( ring, particles.T, nturns, n_x, n_y, task_to_accomplish, islost, final_turn, registered_for_tracking, ), ) processes.append(prc) prc.start() # completing process for prc in processes: prc.join() verboseprint("done") ft_ = np.array(final_turn) mask_tracked = ft_ != -1 mask_islost = np.array(islost) turns_per_point = np.zeros(sum(mask_tracked)) points_tracked = points[:, mask_tracked] lost_after_track = mask_islost[mask_tracked] turns_per_point[lost_after_track] = ft_[mask_islost] turns_per_point[~lost_after_track] = nturns return np.vstack((points_tracked, turns_per_point, lost_after_track))