from __future__ import annotations
import warnings
from enum import IntEnum
from collections.abc import Sequence
import numpy as np
from ..constants import clight
from ..lattice import Lattice, AtWarning, AtError, RFCavity, Collective
from ..lattice import Refpts, uint32_refpts, make_copy
from ..lattice.elements.conversions import _array
from ..physics import get_timelag_fromU0
[docs]
class CavityMode(IntEnum):
"""
CavityMode.ACTIVE is an active feedback loop, acting on both
detuning and generator voltages to compensate the beam induced
voltage.
CavityMode.PASSIVE takes a fixed cavity frequency and assumes
zero generator voltage.
CavityMode.PASSIVE_SETVOLTAGE is a feedback loop for a passive harmonic
cavity. The frequency of the cavity is varied in order to maintain
a voltage setpoint.
"""
ACTIVE = 1
PASSIVE = 2
PASSIVE_SETVOLTAGE = 3
[docs]
class FeedbackMode(IntEnum):
"""
FeedbackMode.ONETURN means the feedback is only using the most recent
turn, and the delta is applied each turn multiplied by VoltGain or
PhaseGain.
FeedbackMode.WINDOW means the beam voltage passed to the feedback is
the average of the WINDOW. For this method, buffersize and windowlength
must also be provided.
"""
ONETURN = 1
WINDOW = 2
[docs]
def add_beamloading(
ring: Lattice,
qfactor: float | Sequence[float],
rshunt: float | Sequence[float],
cavpts: Refpts = None,
copy: bool | None = False,
**kwargs,
):
r"""Function to add beam loading to a cavity element, the cavity
element is changed to a beam loading element that combines the energy
kick from both the cavity and the resonator.
Parameters:
ring: Lattice object
qfactor: Q factor. Scalar or array of float are accepted
rshunt: Shunt impedance, [:math:`\Omega`] for longitudinal
Scalar or array of float are accepted
Keyword Arguments:
cavpts (Refpts): refpts of the cavity. If None (default) apply
to all cavity in the ring
Nslice (int): Number of slices per bunch. Default: 101
Nturns (int): Number of turn for the wake field. Default: 1
ZCuts: Limits for fixed slicing, default is adaptive
NormFact (float): Normalization factor
copy: If True, returns a shallow copy of ring with new
beam loading elements. Otherwise, modify ring in-place
cavitymode (CavityMode): Define PASSIVE or ACTIVE cavity
buffersize (int): Size of the history buffer for vbeam, vgen, vbunch
(default 0)
"""
@make_copy(copy)
def apply(ring, cavpts, newelems):
for ref, elem in zip(cavpts, newelems, strict=True):
ring[ref] = elem
if cavpts is None:
cavpts = ring.get_refpts(RFCavity)
else:
cavpts = uint32_refpts(cavpts, len(ring))
qfactor = np.broadcast_to(qfactor, (len(cavpts),))
rshunt = np.broadcast_to(rshunt, (len(cavpts),))
new_elems = []
for ref, qf, rs in zip(cavpts, qfactor, rshunt, strict=True):
cav = ring[ref]
if not isinstance(cav, RFCavity):
raise TypeError(
"Beam loading can only be assigned" + "to an RFCavity element"
)
new_elems.append(BeamLoadingElement.build_from_cav(cav, ring, qf, rs, **kwargs))
return apply(ring, cavpts, new_elems)
[docs]
def remove_beamloading(ring, cavpts: Refpts = None, copy: bool | None = False):
"""Function to remove beam loading from a cavity element, the beam
loading element is changed to a beam loading element that
combines the energy kick from both the cavity and the resonator.
Parameters:
ring: Lattice object
Keyword Arguments:
cavpts (Refpts): refpts of the beam loading Elements.
If None (default) apply to all elements
copy: If True, returns a shallow copy of ring with new
cavity elements. Otherwise, modify ring in-place
"""
@make_copy(copy)
def apply(ring, cavpts, newelems):
for ref, elem in zip(cavpts, newelems, strict=True):
ring[ref] = elem
if cavpts is None:
cavpts = ring.get_refpts(BeamLoadingElement)
else:
cavpts = uint32_refpts(cavpts, len(ring))
new_elems = []
for ref in cavpts:
bl = ring[ref]
if not isinstance(bl, BeamLoadingElement):
raise TypeError("Cannot remove beam loading: " + "not a BeamLoadingElement")
family_name = bl.FamName.replace("_BL", "")
harm = np.round(bl.Frequency * ring.circumference / clight)
new_elems.append(
RFCavity(
family_name,
bl.Length,
bl.Voltage,
bl.Frequency,
harm,
bl.Energy,
TimeLag=getattr(bl, "TimeLag", 0.0),
PhaseLag=getattr(bl, "PhaseLag", 0.0),
)
)
return apply(ring, cavpts, new_elems)
[docs]
class BeamLoadingElement(RFCavity, Collective):
"""Class to generate a beamloading element, inherits from Element
additional argument are ring, cavity, qfactor, rshunt.
"""
default_pass = {False: "DriftPass", True: "BeamLoadingCavityPass"}
_conversions = dict(
RFCavity._conversions,
Rshunt=float,
Qfactor=float,
NormFact=float,
PhaseGain=float,
VoltGain=float,
_beta=float,
_wakefact=float,
_nslice=int,
ZCuts=lambda v: _array(v),
_cavitymode=int,
_nturns=int,
_phis=float,
_buffersize=int,
_vbeam_phasor=lambda v: _array(v, shape=(2,)),
_vbeam=lambda v: _array(v, shape=(2,)),
_vcav=lambda v: _array(v, shape=(2,)),
_vgen=lambda v: _array(v, shape=(4,)),
)
def __init__(
self,
family_name: str,
length: float,
voltage: float,
frequency: float,
ring: Lattice,
qfactor: float,
rshunt: float,
detune: float | None = 0.0,
cavitymode: CavityMode | None = CavityMode.ACTIVE,
fbmode: FeedbackMode | None = FeedbackMode.ONETURN,
buffersize: int | None = 0,
**kwargs,
):
r"""
Parameters:
ring: Lattice object
length: Length of the cavity
voltage: Cavity voltage [V]
frequency: Cavity frequency [Hz]
qfactor: Q factor
rshunt: Shunt impedance, [:math:`\Omega`].
Keyword Arguments:
Nslice (int): Number of slices per bunch. Default: 101
Nturns (int): Number of turn for the wake field. Default: 1
ZCuts: Limits for fixed slicing, default is adaptive
NormFact (float): Normalization factor
detune [Hz] (float): Define how much to detune the cavity from
resonance in unints of Hz
cavitymode (CavityMode): Is cavity ACTIVE (default), PASSIVE or
PASSIVE_SETVOLTAGE (Passive with a voltage feedback).
For PASSIVE_SETVOLTAGE, the voltage setpoint is specified with
passive_voltage
passive_voltage [V] (float): Voltage setpoint with the passive
cavity with feedback.
PhaseGain (float): Used for cavity feedbacks. States the gain on
the phase correction factor to be applied.
VoltGain (float): Used for cavity feedbacks. States the gain on
the phase correction factor to be applied.
buffersize (int): Size of the history buffer for vbeam, vgen,
vbunch (default 0)
feedback_angle_offset: Fixed detuning from optimal tuning
angle [rad]. For a negative slope of the RF voltage at the
synchronous position, the optimum detuning is negative.
Applying a positive feedback_angle_offset will therefore
reduce the detuning. The reverse is true for positive RF
slope.
ts (float): The timelag of the synchronous particle in the
full RF system [m]. If not specified, it will be calculated
using get_timelag_fromU0. Defines the expected position of the
beam to be used for the beam loading setpoints.
fbmode (FeedbackMode): States the mode for the parameter
calculation to be used for the loop. ONETURN (default) takes
only the current turn, compared to WINDOW which takes a
sliding window.
windowlength (int): for WINDOW feedback mode, states the length
[turns] for the sliding window. Must be smaller than
buffersize.
system_harmonic (float): Used to compute the nominal rf frequency
for the given system. e.g. third of fourth harmonic of
rf_frequency. If None, then will be computed to the nearest
integer multiple of rf_frequency.
Returns:
bl_elem (Element): beam loading element
"""
kwargs.setdefault("PassMethod", self.default_pass[True])
if not isinstance(cavitymode, CavityMode):
raise TypeError("cavitymode has to be an " + "instance of CavityMode")
zcuts = kwargs.pop("ZCuts", None)
ts = kwargs.pop("ts", None)
self.system_harmonic = kwargs.pop(
"system_harmonic", int(np.round(frequency / ring.rf_frequency))
)
self.detune = detune
check_frequency = np.abs(frequency - self.system_harmonic * ring.rf_frequency)
if check_frequency > 1.0: # 1 Hz is the limit for the float check
error_string = (
"Cavity must be an integer of rf_frequency, otherwise"
"the phi_s computation will be wrong. Please use the detune"
"argument when adding beamloading to a cavity that is an"
"integer harmonic."
)
raise AtError(error_string)
self.circumference = ring.circumference
self.bunch_spos = ring.bunch_spos
energy = ring.energy
harmonic_number = self.system_harmonic * ring.harmonic_number
self.feedback_angle_offset = kwargs.pop("feedback_angle_offset", 0)
self.Rshunt = rshunt
self.Qfactor = qfactor
self.NormFact = kwargs.pop("NormFact", 1.0)
self.PhaseGain = kwargs.pop("PhaseGain", 1.0)
self.VoltGain = kwargs.pop("VoltGain", 1.0)
self._cavitymode = int(cavitymode)
if self._cavitymode == 1:
if not isinstance(fbmode, FeedbackMode):
err_string = (
"For an active cavity, fbmode has to be defined and an "
"instance of FeedbackMode"
)
raise TypeError(err_string)
self._fbmode = int(fbmode)
else:
self._fbmode = 0
if self.detune == 0 and self._cavitymode == 3:
err_string = (
"Cannot start passive cavity feedback from zero detuning."
"You must decide at the beginning which polarity you want."
"This problem arises because the loop does not know which "
"polarity to take!"
)
raise AtError(err_string)
self._passive_vset = kwargs.pop("passive_voltage", 0.0)
self._beta = ring.beta
self._wakefact = -ring.circumference / (clight * ring.energy * ring.beta**3)
self._nslice = kwargs.pop("Nslice", 101)
self._nturns = kwargs.pop("Nturns", 1)
self._nbunch = ring.nbunch
self._turnhistory = None # Defined here to avoid warning
self._vbunch = None
self._buffersize = buffersize
self._windowlength = kwargs.pop("windowlength", 0)
if self._windowlength > self._buffersize:
err_string = "The windowlength must be smaller than the buffersize"
raise ValueError(err_string)
self._vgen_buffer = np.zeros(1)
self._vbeam_buffer = np.zeros(1)
self._vbunch_buffer = np.zeros(1)
if zcuts is not None:
self.ZCuts = zcuts
super().__init__(
family_name, length, voltage, frequency, harmonic_number, energy, **kwargs
)
if ts is None:
_, ts = get_timelag_fromU0(ring)
self._ts = ts
self._phis = 2 * np.pi * self.Frequency * (-self._ts - self.TimeLag) / clight
# The below is needed because atan2 returns phases between -pi and pi
# this prevents a setpoint being provided that is impossible
# to achieve
while self._phis < -np.pi:
self._phis += 2 * np.pi
while self._phis > np.pi:
self._phis -= 2 * np.pi
self._vbeam_phasor = np.zeros(2)
self._vbeam = np.zeros(2)
self._vgen = np.zeros(4)
cavity_voltage = self.Voltage
if self._cavitymode == 3:
cavity_voltage = self._passive_vset
self._vcav = np.array([cavity_voltage, self._phis])
self.clear_history(ring=ring)
[docs]
def is_compatible(self, other):
return False
[docs]
def clear_history(self, ring=None):
if ring is not None:
self._nbunch = ring.nbunch
current = ring.beam_current
nbunch = ring.nbunch
self._vbunch = np.zeros((nbunch, 2), order="F")
self._init_bl_params(current)
tl = self._nturns * self._nslice * self._nbunch
self._turnhistory = np.zeros((tl, 4), order="F")
if self._buffersize > 0:
self._vgen_buffer = np.zeros((4, self._buffersize), order="F")
self._vbeam_buffer = np.zeros((2, self._buffersize), order="F")
self._vbunch_buffer = np.zeros(
(self._nbunch, 2, self._buffersize), order="F"
)
def _init_bl_params(self, current):
if (self._cavitymode == 1) and (current > 0.0):
vb = 2 * current * self.Rshunt
a = self.Voltage
b = -vb * np.cos(self._phis)
psi = np.arcsin(b / np.sqrt(a**2 + b**2))
if np.isnan(psi):
psi = 0.0
warning_string = (
"Unusual cavity configuration found."
"Setting initial psi to 0 to avoid NaNs"
)
warnings.warn(AtWarning(warning_string), stacklevel=2)
psi += self.feedback_angle_offset
vgen = self.Voltage * np.cos(psi) - vb * np.cos(psi) * np.sin(self._phis)
elif self._cavitymode in {2, 3}:
vgen = 0
psi = np.arctan(
2 * self.Qfactor * (1 - self.Frequency / (self.Frequency + self.detune))
)
else:
vgen = self.Voltage
psi = 0
self._vbeam = np.array([2 * current * self.Rshunt * np.cos(psi), np.pi + psi])
self._vgen = np.array([vgen, psi + self._phis, psi, vgen / np.cos(psi)])
vbp_amp = 2 * current * self.Rshunt * np.cos(psi)
vbp_phase = np.pi + psi
vbp_complex = vbp_amp * np.cos(vbp_phase) + 1j * vbp_amp * np.sin(vbp_phase)
self._vbeam_phasor = np.array([np.abs(vbp_complex), np.angle(vbp_complex)])
@property
def Buffersize(self):
return self._buffersize
@Buffersize.setter
def Buffersize(self, value):
self._buffersize = value
self.clear_history()
@property
def Vgen_buffer(self):
"""Stored generator voltage data."""
return self._vgen_buffer
@property
def Vbeam_buffer(self):
"""Stored beam induced voltage data."""
return self._vbeam_buffer
@property
def Vbunch_buffer(self):
"""Stored bunch induced voltage data."""
return np.moveaxis(self._vbunch_buffer, 0, -1)
@property
def Nslice(self):
"""Number of slices per bunch."""
return self._nslice
@Nslice.setter
def Nslice(self, nslice):
self._nslice = nslice
self.clear_history()
@property
def Nturns(self):
"""Number of turn for the wake field."""
return self._nturns
@Nturns.setter
def Nturns(self, nturns):
self._nturns = nturns
self.clear_history()
@property
def TurnHistory(self):
"""Turn history of the slices center of mass."""
return self._turnhistory
@property
def ResFrequency(self):
"""Resonator frequency."""
delta = (
self.Frequency * np.tan(self.Vgen[2]) / self.Qfactor
) ** 2 + 4 * self.Frequency**2
freqres = (
self.Frequency * np.tan(self.Vgen[2]) / self.Qfactor + np.sqrt(delta)
) / 2
return freqres
@property
def Vbeam(self):
"""Beam phasor (amplitude, phase)."""
return self._vbeam
@property
def Vbunch(self):
"""Bunch phasor (amplitude, phase)."""
return self._vbunch
@property
def Vcav(self):
"""Cavity phasor (amplitude, phase)."""
return self._vcav
@property
def Vgen(self):
"""Generator phasor (amplitude, phase)."""
return self._vgen
@Vgen.setter
def Vgen(self, value):
self._vgen = value
[docs]
@staticmethod
def build_from_cav(
cav: RFCavity,
ring: Sequence,
qfactor: float,
rshunt: float,
cavitymode: CavityMode | None = CavityMode.ACTIVE,
buffersize: int | None = 0,
**kwargs,
):
r"""Function to build the BeamLoadingElement from a cavity
the FamName, Length, Voltage, Frequency and HarmNumber are
taken from the cavity element.
Parameters:
ring: Lattice object
qfactor: Q factor
rshunt: Shunt impedance, [:math:`\Omega`]
Keyword Arguments:
Nslice (int): Number of slices per bunch. Default: 101
Nturns (int): Number of turn for the wake field. Default: 1
ZCuts: Limits for fixed slicing, default is adaptive
NormFact (float): Normalization factor
cavitymode (CavityMode): type of beam loaded cavity ACTIVE
(default) for a cavity with active compensation, or
PASSIVE to only include the beam induced voltage
buffersize (int): Size of the history buffer for vbeam, vgen,
vbunch (default 0)
Returns:
bl_elem (Element): beam loading element
"""
_CAV_ATTRIBUTES = ["Length", "Voltage", "Frequency"]
_EXCL_ATTRIBUTES = ["PassMethod", "Energy", "HarmNumber"]
cav_attrs = dict(cav.items())
family_name = cav_attrs.pop("FamName") + "_BL"
[cav_attrs.pop(k) for k in _EXCL_ATTRIBUTES]
cav_args = [cav_attrs.pop(k, getattr(cav, k)) for k in _CAV_ATTRIBUTES]
if cavitymode == CavityMode.PASSIVE:
if cav_args[1] != 0.0:
warnings.warn(AtWarning("Setting Cavity Voltage to 0"), stacklevel=2)
cav_args[1] = 0.0
return BeamLoadingElement(
family_name,
*cav_args,
ring,
qfactor,
rshunt,
cavitymode=cavitymode,
buffersize=buffersize,
**cav_attrs,
**kwargs,
)
def __repr__(self):
"""Simplified __repr__ to avoid errors due to arguments
not defined as attributes.
"""
att = {k: v for (k, v) in self.items() if not k.startswith("_")}
return f"{self.__class__.__name__}({att})"