Source code for at.collective.beam_loading

from __future__ import annotations

import warnings
from enum import IntEnum
from collections.abc import Sequence

import numpy as np

from ..constants import clight
from ..lattice import Lattice, AtWarning, AtError, RFCavity, Collective
from ..lattice import Refpts, uint32_refpts, make_copy
from ..lattice.elements.conversions import _array
from ..physics import get_timelag_fromU0


[docs] class CavityMode(IntEnum): """ CavityMode.ACTIVE is an active feedback loop, acting on both detuning and generator voltages to compensate the beam induced voltage. CavityMode.PASSIVE takes a fixed cavity frequency and assumes zero generator voltage. CavityMode.PASSIVE_SETVOLTAGE is a feedback loop for a passive harmonic cavity. The frequency of the cavity is varied in order to maintain a voltage setpoint. """ ACTIVE = 1 PASSIVE = 2 PASSIVE_SETVOLTAGE = 3
[docs] class FeedbackMode(IntEnum): """ FeedbackMode.ONETURN means the feedback is only using the most recent turn, and the delta is applied each turn multiplied by VoltGain or PhaseGain. FeedbackMode.WINDOW means the beam voltage passed to the feedback is the average of the WINDOW. For this method, buffersize and windowlength must also be provided. """ ONETURN = 1 WINDOW = 2
[docs] def add_beamloading( ring: Lattice, qfactor: float | Sequence[float], rshunt: float | Sequence[float], cavpts: Refpts = None, copy: bool | None = False, **kwargs, ): r"""Function to add beam loading to a cavity element, the cavity element is changed to a beam loading element that combines the energy kick from both the cavity and the resonator. Parameters: ring: Lattice object qfactor: Q factor. Scalar or array of float are accepted rshunt: Shunt impedance, [:math:`\Omega`] for longitudinal Scalar or array of float are accepted Keyword Arguments: cavpts (Refpts): refpts of the cavity. If None (default) apply to all cavity in the ring Nslice (int): Number of slices per bunch. Default: 101 Nturns (int): Number of turn for the wake field. Default: 1 ZCuts: Limits for fixed slicing, default is adaptive NormFact (float): Normalization factor copy: If True, returns a shallow copy of ring with new beam loading elements. Otherwise, modify ring in-place cavitymode (CavityMode): Define PASSIVE or ACTIVE cavity buffersize (int): Size of the history buffer for vbeam, vgen, vbunch (default 0) """ @make_copy(copy) def apply(ring, cavpts, newelems): for ref, elem in zip(cavpts, newelems, strict=True): ring[ref] = elem if cavpts is None: cavpts = ring.get_refpts(RFCavity) else: cavpts = uint32_refpts(cavpts, len(ring)) qfactor = np.broadcast_to(qfactor, (len(cavpts),)) rshunt = np.broadcast_to(rshunt, (len(cavpts),)) new_elems = [] for ref, qf, rs in zip(cavpts, qfactor, rshunt, strict=True): cav = ring[ref] if not isinstance(cav, RFCavity): raise TypeError( "Beam loading can only be assigned" + "to an RFCavity element" ) new_elems.append(BeamLoadingElement.build_from_cav(cav, ring, qf, rs, **kwargs)) return apply(ring, cavpts, new_elems)
[docs] def remove_beamloading(ring, cavpts: Refpts = None, copy: bool | None = False): """Function to remove beam loading from a cavity element, the beam loading element is changed to a beam loading element that combines the energy kick from both the cavity and the resonator. Parameters: ring: Lattice object Keyword Arguments: cavpts (Refpts): refpts of the beam loading Elements. If None (default) apply to all elements copy: If True, returns a shallow copy of ring with new cavity elements. Otherwise, modify ring in-place """ @make_copy(copy) def apply(ring, cavpts, newelems): for ref, elem in zip(cavpts, newelems, strict=True): ring[ref] = elem if cavpts is None: cavpts = ring.get_refpts(BeamLoadingElement) else: cavpts = uint32_refpts(cavpts, len(ring)) new_elems = [] for ref in cavpts: bl = ring[ref] if not isinstance(bl, BeamLoadingElement): raise TypeError("Cannot remove beam loading: " + "not a BeamLoadingElement") family_name = bl.FamName.replace("_BL", "") harm = np.round(bl.Frequency * ring.circumference / clight) new_elems.append( RFCavity( family_name, bl.Length, bl.Voltage, bl.Frequency, harm, bl.Energy, TimeLag=getattr(bl, "TimeLag", 0.0), PhaseLag=getattr(bl, "PhaseLag", 0.0), ) ) return apply(ring, cavpts, new_elems)
[docs] class BeamLoadingElement(RFCavity, Collective): """Class to generate a beamloading element, inherits from Element additional argument are ring, cavity, qfactor, rshunt. """ default_pass = {False: "DriftPass", True: "BeamLoadingCavityPass"} _conversions = dict( RFCavity._conversions, Rshunt=float, Qfactor=float, NormFact=float, PhaseGain=float, VoltGain=float, _beta=float, _wakefact=float, _nslice=int, ZCuts=lambda v: _array(v), _cavitymode=int, _nturns=int, _phis=float, _buffersize=int, _vbeam_phasor=lambda v: _array(v, shape=(2,)), _vbeam=lambda v: _array(v, shape=(2,)), _vcav=lambda v: _array(v, shape=(2,)), _vgen=lambda v: _array(v, shape=(4,)), ) def __init__( self, family_name: str, length: float, voltage: float, frequency: float, ring: Lattice, qfactor: float, rshunt: float, detune: float | None = 0.0, cavitymode: CavityMode | None = CavityMode.ACTIVE, fbmode: FeedbackMode | None = FeedbackMode.ONETURN, buffersize: int | None = 0, **kwargs, ): r""" Parameters: ring: Lattice object length: Length of the cavity voltage: Cavity voltage [V] frequency: Cavity frequency [Hz] qfactor: Q factor rshunt: Shunt impedance, [:math:`\Omega`]. Keyword Arguments: Nslice (int): Number of slices per bunch. Default: 101 Nturns (int): Number of turn for the wake field. Default: 1 ZCuts: Limits for fixed slicing, default is adaptive NormFact (float): Normalization factor detune [Hz] (float): Define how much to detune the cavity from resonance in unints of Hz cavitymode (CavityMode): Is cavity ACTIVE (default), PASSIVE or PASSIVE_SETVOLTAGE (Passive with a voltage feedback). For PASSIVE_SETVOLTAGE, the voltage setpoint is specified with passive_voltage passive_voltage [V] (float): Voltage setpoint with the passive cavity with feedback. PhaseGain (float): Used for cavity feedbacks. States the gain on the phase correction factor to be applied. VoltGain (float): Used for cavity feedbacks. States the gain on the phase correction factor to be applied. buffersize (int): Size of the history buffer for vbeam, vgen, vbunch (default 0) feedback_angle_offset: Fixed detuning from optimal tuning angle [rad]. For a negative slope of the RF voltage at the synchronous position, the optimum detuning is negative. Applying a positive feedback_angle_offset will therefore reduce the detuning. The reverse is true for positive RF slope. ts (float): The timelag of the synchronous particle in the full RF system [m]. If not specified, it will be calculated using get_timelag_fromU0. Defines the expected position of the beam to be used for the beam loading setpoints. fbmode (FeedbackMode): States the mode for the parameter calculation to be used for the loop. ONETURN (default) takes only the current turn, compared to WINDOW which takes a sliding window. windowlength (int): for WINDOW feedback mode, states the length [turns] for the sliding window. Must be smaller than buffersize. system_harmonic (float): Used to compute the nominal rf frequency for the given system. e.g. third of fourth harmonic of rf_frequency. If None, then will be computed to the nearest integer multiple of rf_frequency. Returns: bl_elem (Element): beam loading element """ kwargs.setdefault("PassMethod", self.default_pass[True]) if not isinstance(cavitymode, CavityMode): raise TypeError("cavitymode has to be an " + "instance of CavityMode") zcuts = kwargs.pop("ZCuts", None) ts = kwargs.pop("ts", None) self.system_harmonic = kwargs.pop( "system_harmonic", int(np.round(frequency / ring.rf_frequency)) ) self.detune = detune check_frequency = np.abs(frequency - self.system_harmonic * ring.rf_frequency) if check_frequency > 1.0: # 1 Hz is the limit for the float check error_string = ( "Cavity must be an integer of rf_frequency, otherwise" "the phi_s computation will be wrong. Please use the detune" "argument when adding beamloading to a cavity that is an" "integer harmonic." ) raise AtError(error_string) self.circumference = ring.circumference self.bunch_spos = ring.bunch_spos energy = ring.energy harmonic_number = self.system_harmonic * ring.harmonic_number self.feedback_angle_offset = kwargs.pop("feedback_angle_offset", 0) self.Rshunt = rshunt self.Qfactor = qfactor self.NormFact = kwargs.pop("NormFact", 1.0) self.PhaseGain = kwargs.pop("PhaseGain", 1.0) self.VoltGain = kwargs.pop("VoltGain", 1.0) self._cavitymode = int(cavitymode) if self._cavitymode == 1: if not isinstance(fbmode, FeedbackMode): err_string = ( "For an active cavity, fbmode has to be defined and an " "instance of FeedbackMode" ) raise TypeError(err_string) self._fbmode = int(fbmode) else: self._fbmode = 0 if self.detune == 0 and self._cavitymode == 3: err_string = ( "Cannot start passive cavity feedback from zero detuning." "You must decide at the beginning which polarity you want." "This problem arises because the loop does not know which " "polarity to take!" ) raise AtError(err_string) self._passive_vset = kwargs.pop("passive_voltage", 0.0) self._beta = ring.beta self._wakefact = -ring.circumference / (clight * ring.energy * ring.beta**3) self._nslice = kwargs.pop("Nslice", 101) self._nturns = kwargs.pop("Nturns", 1) self._nbunch = ring.nbunch self._turnhistory = None # Defined here to avoid warning self._vbunch = None self._buffersize = buffersize self._windowlength = kwargs.pop("windowlength", 0) if self._windowlength > self._buffersize: err_string = "The windowlength must be smaller than the buffersize" raise ValueError(err_string) self._vgen_buffer = np.zeros(1) self._vbeam_buffer = np.zeros(1) self._vbunch_buffer = np.zeros(1) if zcuts is not None: self.ZCuts = zcuts super().__init__( family_name, length, voltage, frequency, harmonic_number, energy, **kwargs ) if ts is None: _, ts = get_timelag_fromU0(ring) self._ts = ts self._phis = 2 * np.pi * self.Frequency * (-self._ts - self.TimeLag) / clight # The below is needed because atan2 returns phases between -pi and pi # this prevents a setpoint being provided that is impossible # to achieve while self._phis < -np.pi: self._phis += 2 * np.pi while self._phis > np.pi: self._phis -= 2 * np.pi self._vbeam_phasor = np.zeros(2) self._vbeam = np.zeros(2) self._vgen = np.zeros(4) cavity_voltage = self.Voltage if self._cavitymode == 3: cavity_voltage = self._passive_vset self._vcav = np.array([cavity_voltage, self._phis]) self.clear_history(ring=ring)
[docs] def is_compatible(self, other): return False
[docs] def clear_history(self, ring=None): if ring is not None: self._nbunch = ring.nbunch current = ring.beam_current nbunch = ring.nbunch self._vbunch = np.zeros((nbunch, 2), order="F") self._init_bl_params(current) tl = self._nturns * self._nslice * self._nbunch self._turnhistory = np.zeros((tl, 4), order="F") if self._buffersize > 0: self._vgen_buffer = np.zeros((4, self._buffersize), order="F") self._vbeam_buffer = np.zeros((2, self._buffersize), order="F") self._vbunch_buffer = np.zeros( (self._nbunch, 2, self._buffersize), order="F" )
def _init_bl_params(self, current): if (self._cavitymode == 1) and (current > 0.0): vb = 2 * current * self.Rshunt a = self.Voltage b = -vb * np.cos(self._phis) psi = np.arcsin(b / np.sqrt(a**2 + b**2)) if np.isnan(psi): psi = 0.0 warning_string = ( "Unusual cavity configuration found." "Setting initial psi to 0 to avoid NaNs" ) warnings.warn(AtWarning(warning_string), stacklevel=2) psi += self.feedback_angle_offset vgen = self.Voltage * np.cos(psi) - vb * np.cos(psi) * np.sin(self._phis) elif self._cavitymode in {2, 3}: vgen = 0 psi = np.arctan( 2 * self.Qfactor * (1 - self.Frequency / (self.Frequency + self.detune)) ) else: vgen = self.Voltage psi = 0 self._vbeam = np.array([2 * current * self.Rshunt * np.cos(psi), np.pi + psi]) self._vgen = np.array([vgen, psi + self._phis, psi, vgen / np.cos(psi)]) vbp_amp = 2 * current * self.Rshunt * np.cos(psi) vbp_phase = np.pi + psi vbp_complex = vbp_amp * np.cos(vbp_phase) + 1j * vbp_amp * np.sin(vbp_phase) self._vbeam_phasor = np.array([np.abs(vbp_complex), np.angle(vbp_complex)]) @property def Buffersize(self): return self._buffersize @Buffersize.setter def Buffersize(self, value): self._buffersize = value self.clear_history() @property def Vgen_buffer(self): """Stored generator voltage data.""" return self._vgen_buffer @property def Vbeam_buffer(self): """Stored beam induced voltage data.""" return self._vbeam_buffer @property def Vbunch_buffer(self): """Stored bunch induced voltage data.""" return np.moveaxis(self._vbunch_buffer, 0, -1) @property def Nslice(self): """Number of slices per bunch.""" return self._nslice @Nslice.setter def Nslice(self, nslice): self._nslice = nslice self.clear_history() @property def Nturns(self): """Number of turn for the wake field.""" return self._nturns @Nturns.setter def Nturns(self, nturns): self._nturns = nturns self.clear_history() @property def TurnHistory(self): """Turn history of the slices center of mass.""" return self._turnhistory @property def ResFrequency(self): """Resonator frequency.""" delta = ( self.Frequency * np.tan(self.Vgen[2]) / self.Qfactor ) ** 2 + 4 * self.Frequency**2 freqres = ( self.Frequency * np.tan(self.Vgen[2]) / self.Qfactor + np.sqrt(delta) ) / 2 return freqres @property def Vbeam(self): """Beam phasor (amplitude, phase).""" return self._vbeam @property def Vbunch(self): """Bunch phasor (amplitude, phase).""" return self._vbunch @property def Vcav(self): """Cavity phasor (amplitude, phase).""" return self._vcav @property def Vgen(self): """Generator phasor (amplitude, phase).""" return self._vgen @Vgen.setter def Vgen(self, value): self._vgen = value
[docs] @staticmethod def build_from_cav( cav: RFCavity, ring: Sequence, qfactor: float, rshunt: float, cavitymode: CavityMode | None = CavityMode.ACTIVE, buffersize: int | None = 0, **kwargs, ): r"""Function to build the BeamLoadingElement from a cavity the FamName, Length, Voltage, Frequency and HarmNumber are taken from the cavity element. Parameters: ring: Lattice object qfactor: Q factor rshunt: Shunt impedance, [:math:`\Omega`] Keyword Arguments: Nslice (int): Number of slices per bunch. Default: 101 Nturns (int): Number of turn for the wake field. Default: 1 ZCuts: Limits for fixed slicing, default is adaptive NormFact (float): Normalization factor cavitymode (CavityMode): type of beam loaded cavity ACTIVE (default) for a cavity with active compensation, or PASSIVE to only include the beam induced voltage buffersize (int): Size of the history buffer for vbeam, vgen, vbunch (default 0) Returns: bl_elem (Element): beam loading element """ _CAV_ATTRIBUTES = ["Length", "Voltage", "Frequency"] _EXCL_ATTRIBUTES = ["PassMethod", "Energy", "HarmNumber"] cav_attrs = dict(cav.items()) family_name = cav_attrs.pop("FamName") + "_BL" [cav_attrs.pop(k) for k in _EXCL_ATTRIBUTES] cav_args = [cav_attrs.pop(k, getattr(cav, k)) for k in _CAV_ATTRIBUTES] if cavitymode == CavityMode.PASSIVE: if cav_args[1] != 0.0: warnings.warn(AtWarning("Setting Cavity Voltage to 0"), stacklevel=2) cav_args[1] = 0.0 return BeamLoadingElement( family_name, *cav_args, ring, qfactor, rshunt, cavitymode=cavitymode, buffersize=buffersize, **cav_attrs, **kwargs, )
def __repr__(self): """Simplified __repr__ to avoid errors due to arguments not defined as attributes. """ att = {k: v for (k, v) in self.items() if not k.startswith("_")} return f"{self.__class__.__name__}({att})"