Source code for at.acceptance.floodfill_acceptance

"""This is an implementation of the Flood Fill algorithm for pyat."""

from multiprocessing import Manager, Process, Queue

import numpy as np

import at

# Author : E. Serra,  UAB and ALBA,  2025 original version in python
#                                    See. IPAC2025, MOPB065.
#                                    APPLICATION OF FAST ALGORITHMS TO
#                                    CALCULATE DYNAMIC AND
#                                    MOMENTUM APERTURE TO THE DESIGN OF
#                                    ALBA II.
#                                    JACoW-IPAC25-MOPB065
# Edited : O. Blanco, ALBA,          2025 pyat parallel version

__all__ = ["floodfill"]


[docs] def floodfill( ring: at.Lattice, nturns: int = 1000, window: list | tuple = (-10e-3, 10e-3, -10e-3, 10e-3), grid_size: list | tuple = (10, 10), axes: list | tuple = (0, 2), offset: list | np.ndarray | None = None, verbose: bool = False, use_mp: bool | at.MPMode = True, pool_size: int = 10, ) -> np.ndarray: """Find the 2D acceptance of the ring using Flood Fill [1]_. Flood fill [1]_ tracks particles from the exterior to the border of the acceptance. The lost particles are returned in plost. The not lost particles are returned in pnotlost. Parameters: ring: pyat lattice nturns: Number of turns for the tracking. Default: 1000 window: Min and max coordinate range, [Axis1min,Axis1max, Axis2min,Axis2max]. Default [-10e-3,10e-3,-10e-3,10e-3]. Axis1 and Axis2 are defined by 'axes'. grid_size: Number of steps per axis. Default [10,10]. axes: Indexes of axes to be scanned. Default is [0,2], i.e. x-y offset: Offset to be added. Default np.zeros((6)). This is useful to study off-axis acceptance on any plane, or off-momentum acceptance by adding dp to the 5th coord., to track particles on the closed orbit, or to add a small deviation to the tracked coordinates, e.g. [10e-5 10e-5] in the transverse planes. verbose: Print extra info. Default 0. use_mp: Parallel tracking, default True. It uses at.MPMode.CPU, not at.MPMode.GPU. pool_size: Number of cpus. Returns: A (4,n) array with the following rows: Position on axis 1 of the n tracked particles Position on axis 2 of the n tracked particles Flag set to 1 if the nth particle is lost, otherwise 0 Number of turns the n particle did during tracking Example: >>> ff_data = floodfill(ring, nturns=500) >>> alive_mask = ff_data[2,:] == 0 .. Note:: This method is recommended for single or low number of CPUs, and does not scale well for parallel computing. References: .. [1] B. Riemann, M. Aiba, J. Kallestrup, and A. Streun, "Efficient algorithms for dynamic aperture and momentum acceptance calculation in synchrotron light sources", Phys. Rev. Accel. Beams, vol. 27, no. 9, p. 094 002, 2024. doi:10.1103/PhysRevAccelBeams.27.094002 """ def track_queue( ring: at.Lattice, zin: np.ndarray, nturns: int, n_x: int, n_y: int, task_to_accomplish: list, islost: list, final_turn: list, registered_for_tracking: list, ) -> None: """Track particles with index inside queue. Parameters: ring: pyat ring zin: particles coordinates (6,nparticles) nturns: number of tracking turns n_x: horizontal grid size n_y: vertical grid size task_to_accomplish: list with indexes of particles to track islost: list with particle loss result from tracking final_turn: turn when the particle is lost registered_for_tracking: particle tracked or to be tracked """ while not task_to_accomplish.empty(): task_id = task_to_accomplish.get(block=True, timeout=10) registered_for_tracking[task_id] = True *_, losses_data = ring.track(zin[:, task_id], nturns, losses=True) islost[task_id] = losses_data["loss_map"]["islost"][0] final_turn[task_id] = losses_data["loss_map"]["turn"][0] if islost[task_id]: # next particles id_move = np.array( [task_id + 1, task_id - 1, task_id + n_y, task_id - n_y] ) # use only valid index mask = np.logical_and(id_move >= 0, id_move < (n_x * n_y)) newid = id_move[mask] for i in newid: if not registered_for_tracking[i]: registered_for_tracking[i] = True task_to_accomplish.put(i) # Initialize variables if offset is None: offset = 6 * [0] offset = np.array(offset) # Initialize output in case we return earlier data_tracked = np.zeros((4, 0)) verboseprint = print if verbose else lambda *a, **k: None verboseprint("Flood fill starts.") verboseprint(f"Maximum number of turns: {nturns}") if window[0] == window[1] or window[2] == window[3]: verboseprint("Window is too narrow") return data_tracked # set the grid if np.any(np.asarray(grid_size) < 1): print("Horizontal and vertical grid size should be more than 1") return data_tracked n_x, n_y = grid_size min_x = min(window[0:2]) max_x = max(window[0:2]) min_y = min(window[2:4]) max_y = max(window[2:4]) xvals = np.linspace(min_x, max_x, n_x) yvals = np.linspace(min_y, max_y, n_y) nparticles = n_x * n_y points = np.zeros((2, nparticles)) verboseprint(f"Number of grid points: {nparticles}") # set the order ii_ = 0 for ix_ in range(n_x): for iy_ in range(n_y): points[0, ii_] = xvals[ix_] points[1, ii_] = yvals[iy_] ii_ = ii_ + 1 ndims = 6 particles = np.zeros((nparticles, ndims)) particles = particles + offset particles[:, axes[0]] = particles[:, axes[0]] + points[0, :] particles[:, axes[1]] = particles[:, axes[1]] + points[1, :] # parallel parameters nproc = pool_size if use_mp else 1 verboseprint(f"Number of processors: {nproc}") task_to_accomplish = Queue() processes = [] leftside = np.arange(1, n_y - 1) rightside = np.arange((n_x - 1) * n_y + 1, n_x * n_y - 1) lowerline = np.asarray([*range(0, n_y * n_x, n_y)]) upperline = lowerline + n_y - 1 manager = Manager() registered_for_tracking = manager.list(nparticles * [False]) for i in np.concatenate((upperline, leftside, lowerline, rightside)): task_to_accomplish.put(i) registered_for_tracking[i] = True verboseprint("Tracking...") # creating processes islost = manager.list(nparticles * [False]) final_turn = manager.list(nparticles * [-1]) for _worker in range(nproc): prc = Process( target=track_queue, args=( ring, particles.T, nturns, n_x, n_y, task_to_accomplish, islost, final_turn, registered_for_tracking, ), ) processes.append(prc) prc.start() # completing process for prc in processes: prc.join() verboseprint("done") ft_ = np.array(final_turn) mask_tracked = ft_ != -1 mask_islost = np.array(islost) turns_per_point = np.zeros(sum(mask_tracked)) points_tracked = points[:, mask_tracked] lost_after_track = mask_islost[mask_tracked] turns_per_point[lost_after_track] = ft_[mask_islost] turns_per_point[~lost_after_track] = nturns return np.vstack((points_tracked, lost_after_track, turns_per_point))