"""This is an implementation of the Flood Fill algorithm for pyat."""
from multiprocessing import Manager, Process, Queue
import numpy as np
import at
# Author : E. Serra, UAB and ALBA, 2025 original version in python
# See. IPAC2025, MOPB065.
# APPLICATION OF FAST ALGORITHMS TO
# CALCULATE DYNAMIC AND
# MOMENTUM APERTURE TO THE DESIGN OF
# ALBA II.
# JACoW-IPAC25-MOPB065
# Edited : O. Blanco, ALBA, 2025 pyat parallel version
__all__ = ["floodfill"]
[docs]
def floodfill(
ring: at.Lattice,
nturns: int = 1000,
window: list | tuple = (-10e-3, 10e-3, -10e-3, 10e-3),
grid_size: list | tuple = (10, 10),
axes: list | tuple = (0, 2),
offset: list | np.ndarray | None = None,
verbose: bool = False,
use_mp: bool | at.MPMode = True,
pool_size: int = 10,
) -> np.ndarray:
"""Find the 2D acceptance of the ring using Flood Fill [1]_.
Flood fill [1]_ tracks particles from the exterior to the border of the
acceptance.
The lost particles are returned in plost.
The not lost particles are returned in pnotlost.
Parameters:
ring: pyat lattice
nturns: Number of turns for the tracking. Default: 1000
window: Min and max coordinate range, [Axis1min,Axis1max,
Axis2min,Axis2max]. Default [-10e-3,10e-3,-10e-3,10e-3].
Axis1 and Axis2 are defined by 'axes'.
grid_size: Number of steps per axis. Default [10,10].
axes: Indexes of axes to be scanned. Default is [0,2], i.e. x-y
offset: Offset to be added. Default np.zeros((6)).
This is useful to study off-axis acceptance on any plane,
or off-momentum acceptance by adding dp to the 5th coord.,
to track particles on the closed orbit, or to add
a small deviation to the tracked coordinates,
e.g. [10e-5 10e-5] in the transverse planes.
verbose: Print extra info. Default 0.
use_mp: Parallel tracking, default True. It uses at.MPMode.CPU,
not at.MPMode.GPU.
pool_size: Number of cpus.
Returns:
A (4,n) array with the following rows:
Position on axis 1 of the n tracked particles
Position on axis 2 of the n tracked particles
Flag set to 1 if the nth particle is lost, otherwise 0
Number of turns the n particle did during tracking
Example:
>>> ff_data = floodfill(ring, nturns=500)
>>> alive_mask = ff_data[2,:] == 0
.. Note::
This method is recommended for single or low number of CPUs, and does not scale
well for parallel computing.
References:
.. [1] B. Riemann, M. Aiba, J. Kallestrup, and A. Streun, "Efficient
algorithms for dynamic aperture and momentum acceptance
calculation in synchrotron light sources",
Phys. Rev. Accel. Beams, vol. 27, no. 9, p. 094 002, 2024.
doi:10.1103/PhysRevAccelBeams.27.094002
"""
def track_queue(
ring: at.Lattice,
zin: np.ndarray,
nturns: int,
n_x: int,
n_y: int,
task_to_accomplish: list,
islost: list,
final_turn: list,
registered_for_tracking: list,
) -> None:
"""Track particles with index inside queue.
Parameters:
ring: pyat ring
zin: particles coordinates (6,nparticles)
nturns: number of tracking turns
n_x: horizontal grid size
n_y: vertical grid size
task_to_accomplish: list with indexes of particles to track
islost: list with particle loss result from tracking
final_turn: turn when the particle is lost
registered_for_tracking: particle tracked or to be tracked
"""
while not task_to_accomplish.empty():
task_id = task_to_accomplish.get(block=True, timeout=10)
registered_for_tracking[task_id] = True
*_, losses_data = ring.track(zin[:, task_id], nturns, losses=True)
islost[task_id] = losses_data["loss_map"]["islost"][0]
final_turn[task_id] = losses_data["loss_map"]["turn"][0]
if islost[task_id]:
# next particles
id_move = np.array(
[task_id + 1, task_id - 1, task_id + n_y, task_id - n_y]
)
# use only valid index
mask = np.logical_and(id_move >= 0, id_move < (n_x * n_y))
newid = id_move[mask]
for i in newid:
if not registered_for_tracking[i]:
registered_for_tracking[i] = True
task_to_accomplish.put(i)
# Initialize variables
if offset is None:
offset = 6 * [0]
offset = np.array(offset)
# Initialize output in case we return earlier
data_tracked = np.zeros((4, 0))
verboseprint = print if verbose else lambda *a, **k: None
verboseprint("Flood fill starts.")
verboseprint(f"Maximum number of turns: {nturns}")
if window[0] == window[1] or window[2] == window[3]:
verboseprint("Window is too narrow")
return data_tracked
# set the grid
if np.any(np.asarray(grid_size) < 1):
print("Horizontal and vertical grid size should be more than 1")
return data_tracked
n_x, n_y = grid_size
min_x = min(window[0:2])
max_x = max(window[0:2])
min_y = min(window[2:4])
max_y = max(window[2:4])
xvals = np.linspace(min_x, max_x, n_x)
yvals = np.linspace(min_y, max_y, n_y)
nparticles = n_x * n_y
points = np.zeros((2, nparticles))
verboseprint(f"Number of grid points: {nparticles}")
# set the order
ii_ = 0
for ix_ in range(n_x):
for iy_ in range(n_y):
points[0, ii_] = xvals[ix_]
points[1, ii_] = yvals[iy_]
ii_ = ii_ + 1
ndims = 6
particles = np.zeros((nparticles, ndims))
particles = particles + offset
particles[:, axes[0]] = particles[:, axes[0]] + points[0, :]
particles[:, axes[1]] = particles[:, axes[1]] + points[1, :]
# parallel parameters
nproc = pool_size if use_mp else 1
verboseprint(f"Number of processors: {nproc}")
task_to_accomplish = Queue()
processes = []
leftside = np.arange(1, n_y - 1)
rightside = np.arange((n_x - 1) * n_y + 1, n_x * n_y - 1)
lowerline = np.asarray([*range(0, n_y * n_x, n_y)])
upperline = lowerline + n_y - 1
manager = Manager()
registered_for_tracking = manager.list(nparticles * [False])
for i in np.concatenate((upperline, leftside, lowerline, rightside)):
task_to_accomplish.put(i)
registered_for_tracking[i] = True
verboseprint("Tracking...")
# creating processes
islost = manager.list(nparticles * [False])
final_turn = manager.list(nparticles * [-1])
for _worker in range(nproc):
prc = Process(
target=track_queue,
args=(
ring,
particles.T,
nturns,
n_x,
n_y,
task_to_accomplish,
islost,
final_turn,
registered_for_tracking,
),
)
processes.append(prc)
prc.start()
# completing process
for prc in processes:
prc.join()
verboseprint("done")
ft_ = np.array(final_turn)
mask_tracked = ft_ != -1
mask_islost = np.array(islost)
turns_per_point = np.zeros(sum(mask_tracked))
points_tracked = points[:, mask_tracked]
lost_after_track = mask_islost[mask_tracked]
turns_per_point[lost_after_track] = ft_[mask_islost]
turns_per_point[~lost_after_track] = nturns
return np.vstack((points_tracked, lost_after_track, turns_per_point))