Source code for at.physics.harmonic_analysis

"""
Simplified version of harpy from
Jaime Coello Maria de Portugal - Martinez Vazquez
Github: https://github.com/pylhc/harpy
"""

from __future__ import annotations
import numpy
from warnings import warn
from scipy.fft import fft, fftfreq
from at.lattice import AtWarning
from at.lattice import AtError


__all__ = ['get_spectrum_harmonic', 'get_main_harmonic',
           'get_tunes_harmonic']


def _pad_signal(samples, pad_length):
    """ Pad signal and round to the next power of 2"""
    if pad_length is not None:
        length = len(samples)
        pad_length = (1 << (length + int(pad_length)
                            - 1).bit_length()) - length
        samples = numpy.pad(samples, (0, pad_length))
    return samples


def _interpolate_peak(complex_values):
    """This method interpolates the real frequency of the
       signal using the three highest peaks in the FFT.
    """
    k = numpy.argmax(numpy.abs(complex_values))
    rv = complex_values
    n = len(complex_values)
    delta = numpy.tan(numpy.pi / n) / (numpy.pi / n)
    kp = (k + 1) % n
    km = (k - 1) % n
    dk = rv[km] - rv[kp]
    sk = 2 * rv[k] - rv[km] - rv[kp]
    delta *= numpy.real(dk/sk)
    return (k + delta) / n


def _compute_coef(samples, freq):
    """
    Computes the coefficient of the Discrete Time Fourier
    Transform corresponding to the given frequency
    """
    n = len(samples)
    exponents = numpy.exp(-2j*numpy.pi * freq * numpy.arange(n))
    coef = numpy.sum(exponents * samples)
    return coef


def _interpolated_fft(samples, num_harmonics, fmin, fmax,
                      maxiter):
    """Compute the interpolated FFT of a signal"""
    rn = numpy.arange(len(samples))
    coefficients = numpy.zeros(num_harmonics, dtype=complex)
    frequencies = numpy.zeros(num_harmonics)

    nfound = 0
    niter = 0
    nmax = num_harmonics*maxiter

    while (nfound < num_harmonics) and (niter < nmax):
        fft_data = fft(samples)
        frequency = _interpolate_peak(fft_data)
        coefficient = _compute_coef(samples, frequency)
        if frequency >= fmin and frequency <= fmax:
            frequencies[nfound] = frequency
            coefficients[nfound] = coefficient
            nfound += 1
        samples = samples - coefficient * numpy.exp(2j*numpy.pi*frequency*rn)
        niter += 1

    if nfound < num_harmonics:
        msg = ('{0}/{1} harmonics found in '
               'requested range'.format(nfound, num_harmonics))
        warn(AtWarning(msg))
    if nfound == 0:
        msg = ('No harmonic found within range, '
               'consider extending it or increase maxiter')
        raise AtError(msg)

    coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies),
                                    key=lambda tuple: numpy.abs(tuple[0]),
                                    reverse=True))
    return frequencies, coefficients


[docs]def get_spectrum_harmonic(cent: numpy.ndarray, method: str = 'interp_fft', num_harmonics: int = 20, hann: bool = False, fmin: float = 0, fmax: float = 1, maxiter: float = 10, pad_length=None) -> tuple[numpy.ndarray, numpy.ndarray]: """Frequency analysis of beam motion Parameters: cent: Centroid motions of the particle method: ``'interp_fft'`` or ``'interp_fft'``. Default: ``'interp_fft'`` num_harmonics: Number of harmonics to search for with interp_fft fmin: Lower bound for spectrum search with interp_fft fmax: Upper bound for spectrum search with interp_fft maxiter: Multiplies ``num_harmonics`` to define the max. number of iteration for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT pad_length Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT Returns: frequency (ndarray): (num_harmonics,) array of frequencies amplitude (ndarray): (num_harmonics,) array of amplitudes phase (ndarray): (num_harmonics,) array of phases """ lc = len(cent) # laskar kept for backward compatibility if method == 'interp_fft' or method == 'laskar': if hann: warn(AtWarning('Windowing not efficient for' 'interpolated FFT: ignored')) if pad_length is not None: warn(AtWarning('Padding not efficient for' 'interpolated FFT: ignored')) ha_tune, ha_amp = _interpolated_fft(cent, num_harmonics, fmin, fmax, maxiter) elif method == 'fft': if hann: cent *= numpy.hanning(lc) cent = _pad_signal(cent, pad_length) fft_data = fft(cent) ha_tune, ha_amp = fftfreq(len(fft_data)), fft_data else: raise ValueError('The method ' + method + ' is undefined') ha_phase = numpy.angle(numpy.array(ha_amp)) ha_amp = numpy.abs(numpy.array(ha_amp)) / lc ha_tune = numpy.array(ha_tune) return ha_tune, ha_amp, ha_phase
[docs]def get_main_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', hann: bool = False, fmin: float = 0, fmax: float = 1, maxiter: float = 10, pad_length=None) -> numpy.ndarray: """Computes tunes, amplitudes and pahses from harmonic analysis Parameters: cents: Centroid motions of the particle method: ``'interp_fft'`` or ``'fft'``. Default: ``'interp_fft'`` fmin: Lower bound for tune search fmax: Upper bound for tune search maxiter: Maximum number of iterations for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT pad_length: Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT Returns: tunes (ndarray): numpy array of length len(cents), max of the spectrum within [fmin fmax] amplitude (ndarray): (len(cents), ) array of amplitudes corresponding to the tune phase (ndarray): (len(cents), ) array of phases corresponding to the tune """ def get_max_spectrum(freq, amp, phase, fmin, fmax, method): if method == 'interp_fft': return freq[0], amp[0], phase[0] msk = numpy.logical_and(freq >= fmin, freq <= fmax) amp = amp[msk] freq = freq[msk] phase = phase[msk] freq = freq[numpy.argmax(amp)] phase = phase[numpy.argmax(amp)] amp = numpy.amax(amp) return freq, amp, phase cents = numpy.array(cents) if cents.ndim > 1: npart = cents.shape[0] else: cents = [cents] npart = 1 tunes = numpy.zeros(npart) amps = numpy.zeros(npart) phases = numpy.zeros(npart) for i in range(npart): out = get_spectrum_harmonic(cents[i], num_harmonics=1, method=method, hann=hann, pad_length=pad_length, fmin=fmin, fmax=fmax, maxiter=maxiter) freq, amp, phase = out try: tunes[i], amps[i], phases[i] = get_max_spectrum(freq, amp, phase, fmin, fmax, method) except ValueError: tunes[i] = numpy.nan amps[i] = numpy.nan phases[i] = numpy.nan return tunes, amps, phases
[docs]def get_tunes_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', hann: bool = False, fmin: float = 0, fmax: float = 1, maxiter: float = 10, pad_length=None, **kwargs) -> numpy.ndarray: """Computes tunes from harmonic analysis Parameters: cents: Centroid motions of the particle method: ``'interp_fft'`` or ``'fft'``. Default: ``'interp_fft'`` fmin: Lower bound for tune search fmax: Upper bound for tune search maxiter: Maximum number of iterations for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT pad_length: Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT Returns: tunes (ndarray): numpy array of length len(cents), max of the spectrum within [fmin fmax] """ num_harmonics = kwargs.pop('num_harmonics', 1) # Backward compatibility if num_harmonics != 1: msg = "num_harmonics is deprecated and ignored for tune calculation" warn(AtWarning(msg)) tunes, _, _ = get_main_harmonic(cents, method=method, hann=hann, fmin=fmin, fmax=fmax, pad_length=pad_length) return tunes