Source code for at.physics.harmonic_analysis

"""
Simplified version of harpy from
Jaime Coello Maria de Portugal - Martinez Vazquez
Github: https://github.com/pylhc/harpy
"""

from __future__ import annotations
import numpy
from warnings import warn
from scipy.fft import fft, fftfreq
from at.lattice import AtWarning
from at.lattice import AtError
import multiprocessing
from functools import partial


__all__ = ['get_spectrum_harmonic', 'get_main_harmonic',
           'get_tunes_harmonic']


def _pad_signal(samples, pad_length):
    """ Pad signal and round to the next power of 2"""
    if pad_length is not None:
        length = len(samples)
        pad_length = (1 << (length + int(pad_length)
                            - 1).bit_length()) - length
        samples = numpy.pad(samples, (0, pad_length))
    return samples


def _interpolate_peak(complex_values):
    """This method interpolates the real frequency of the
       signal using the three highest peaks in the FFT.
    """
    k = numpy.argmax(numpy.abs(complex_values))
    rv = complex_values
    n = len(complex_values)
    delta = numpy.tan(numpy.pi / n) / (numpy.pi / n)
    kp = (k + 1) % n
    km = (k - 1) % n
    dk = rv[km] - rv[kp]
    sk = 2 * rv[k] - rv[km] - rv[kp]
    delta *= numpy.real(dk/sk)
    return (k + delta) / n


def _compute_coef(samples, freq):
    """
    Computes the coefficient of the Discrete Time Fourier
    Transform corresponding to the given frequency
    """
    n = len(samples)
    exponents = numpy.exp(-2j*numpy.pi * freq * numpy.arange(n))
    coef = numpy.sum(exponents * samples)/n
    return coef


def _interpolated_fft(samples, num_harmonics, fmin, fmax,
                      maxiter):
    """Compute the interpolated FFT of a signal"""
    rn = numpy.arange(len(samples))
    coefficients = numpy.zeros(num_harmonics, dtype=complex)
    frequencies = numpy.zeros(num_harmonics)

    nfound = 0
    niter = 0
    nmax = num_harmonics * maxiter

    while (nfound < num_harmonics) and (niter < nmax):
        fft_data = fft(samples)
        frequency = _interpolate_peak(fft_data)
        coefficient = _compute_coef(samples, frequency)
        if (frequency >= fmin) and (frequency <= fmax):
            frequencies[nfound] = frequency
            coefficients[nfound] = coefficient
            nfound += 1
        samples = samples - coefficient * numpy.exp(2j*numpy.pi*frequency*rn)
        niter += 1

    if nfound == 0:
        msg = ('No harmonic found within range, '
               'consider extending it or increase maxiter')
        raise AtError(msg)
    elif nfound < num_harmonics:
        msg = ('{0}/{1} harmonics found in '
               'requested range'.format(nfound, num_harmonics))
        warn(AtWarning(msg))

    coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies),
                                    key=lambda tp: numpy.abs(tp[0]),
                                    reverse=True))
    return frequencies, coefficients


[docs]def get_spectrum_harmonic(cent: numpy.ndarray, method: str = 'interp_fft', num_harmonics: int = 20, hann: bool = False, fmin: float = 0, fmax: float = 1, maxiter: float = 10, pad_length: float = None, remove_mean: bool = True) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]: """Frequency analysis of beam motion Parameters: cent: Centroid motions of the particle method: ``'interp_fft'`` or ``'interp_fft'``. Default: ``'interp_fft'`` num_harmonics: Number of harmonics to search for with interp_fft fmin: Lower bound for spectrum search with interp_fft fmax: Upper bound for spectrum search with interp_fft maxiter: Multiplies ``num_harmonics`` to define the max. number of iteration for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT pad_length Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT remove_mean: Remove the mean of the input signal. Default: :py:obj:`True`. Returns: frequency (ndarray): (num_harmonics,) array of frequencies amplitude (ndarray): (num_harmonics,) array of amplitudes phase (ndarray): (num_harmonics,) array of phases """ lc = len(cent) # laskar kept for backward compatibility if remove_mean: cent -= numpy.mean(cent) if method == 'interp_fft' or method == 'laskar': if hann: warn(AtWarning('Windowing not efficient for' 'interpolated FFT: ignored')) if pad_length is not None: warn(AtWarning('Padding not efficient for' 'interpolated FFT: ignored')) ha_tune, ha_amp = _interpolated_fft(cent, num_harmonics, fmin, fmax, maxiter) elif method == 'fft': if hann: cent *= numpy.hanning(lc) cent = _pad_signal(cent, pad_length) fft_data = fft(cent) ha_tune, ha_amp = fftfreq(len(fft_data)), fft_data else: raise ValueError('The method ' + method + ' is undefined') ha_phase = numpy.angle(numpy.array(ha_amp)) ha_amp = numpy.abs(numpy.array(ha_amp)) / lc ha_tune = numpy.array(ha_tune) return ha_tune, ha_amp, ha_phase
def _get_max_spectrum(freq, amp, phase, fmin, fmax): msk = (freq >= fmin) & (freq <= fmax) amp = amp[msk] freq = freq[msk] phase = phase[msk] tune = freq[numpy.argmax(amp)] phase = phase[numpy.argmax(amp)] amp = numpy.amax(amp) return tune, amp, phase def _get_main_single(cents, **kwargs): def get_hmain(cents): fmin = kwargs.get('fmin', 0) fmax = kwargs.get('fmax', 1) try: out = get_spectrum_harmonic(cents, **kwargs) freq, amp, phase = out tunes, amps, phases = _get_max_spectrum(freq, amp, phase, fmin, fmax) except AtError: msg = ('No harmonic found within range, ' 'consider extending it or increase maxiter') warn(AtWarning(msg)) tunes = numpy.nan amps = numpy.nan phases = numpy.nan except ValueError: msg = ('Invalid input vector provided') warn(AtWarning(msg)) tunes = numpy.nan amps = numpy.nan phases = numpy.nan return tunes, amps, phases cents = numpy.atleast_2d(cents) results = [get_hmain(c) for c in cents] return numpy.transpose(results) def _get_main_multi(cents, **kwargs): cents = numpy.array(cents) start_method = kwargs.pop('start_method', None) pool_size = kwargs.pop('pool_size', None) if cents.ndim > 1: npart = cents.shape[0] else: return _get_main_single(cents, **kwargs) if pool_size is None: pool_size = min(npart, multiprocessing.cpu_count()) ctx = multiprocessing.get_context(start_method) fun = partial(_get_main_single, **kwargs) with ctx.Pool(pool_size) as pool: results = pool.map(fun, cents) results = numpy.concatenate(results, axis=1) return results
[docs]def get_main_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', hann: bool = False, fmin: float = 0, fmax: float = 1, num_harmonics: int = 1, maxiter: float = 10, pad_length=None, use_mp: bool = False, pool_size: int = None, start_method: str = None, remove_mean: bool = True) -> tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]: """Computes tunes, amplitudes and phases from harmonic analysis The tune is defined as the harmonic with the maximum amplitude within the search range. Parameters: cents: Centroid motions of the particle method: ``'interp_fft'`` or ``'fft'``. Default: ``'interp_fft'`` fmin: Lower bound for tune search fmax: Upper bound for tune search num_harmonics: Number of harmonics to search for. Default=1. maxiter: Maximum number of iterations for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT pad_length: Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT use_mp (bool): Flag to activate multiprocessing (default: False) pool_size: number of processes used when *use_mp* is :py:obj:`True`. If None, ``min(npart,nproc)`` is used. It can be globally set using the variable *at.lattice.DConstant.patpass_poolsize* start_method: Python multiprocessing start method. Default None uses the OS default method. remove_mean: Remove the mean of the input signal. Default: :py:obj:`True`. Returns: tunes (ndarray): numpy array of length len(cents), max of the spectrum within [fmin fmax] amplitude (ndarray): (len(cents), ) array of amplitudes corresponding to the tune phase (ndarray): (len(cents), ) array of phases corresponding to the tune .. note:: * The tune is defined as the harmonic with the maximum amplitude within the search range ``(fmin, fmax)``. * In case a ``Nan`` is present in the input vector or the tune cannot be found within the range, the function returns ``NaN``. * For the method ``'interp_fft'``, harmonics are calculated iteratively starting from the maximum peak of the raw FFT. ``num_harmonics=1`` is the default, only the first harmonic is calculated. However, it is possible that the maximum of the interpolated FFT does not correspond to the maximum of the raw FFT, in which case ``num_harmonics`` has to be increased to get the correct peak. """ if use_mp: tunes, amps, phases = _get_main_multi(cents, num_harmonics=num_harmonics, method=method, hann=hann, pad_length=pad_length, fmin=fmin, fmax=fmax, maxiter=maxiter, pool_size=pool_size, start_method=start_method, remove_mean=remove_mean) else: tunes, amps, phases = _get_main_single(cents, num_harmonics=num_harmonics, method=method, hann=hann, pad_length=pad_length, fmin=fmin, fmax=fmax, maxiter=maxiter, remove_mean=remove_mean) return tunes, amps, phases
[docs]def get_tunes_harmonic(cents: numpy.ndarray, method: str = 'interp_fft', hann: bool = False, fmin: float = 0, fmax: float = 1, num_harmonics: int = 1, maxiter: float = 10, pad_length=None, use_mp: bool = False, pool_size: int = None, start_method: str = None, remove_mean: bool = True, **kwargs) -> numpy.ndarray: """Computes tunes from harmonic analysis. Parameters: cents: Centroid motions of the particle method: ``'interp_fft'`` or ``'fft'``. Default: ``'interp_fft'`` fmin: Lower bound for tune search fmax: Upper bound for tune search num_harmonics: Number of harmonics to search for. Default=1. maxiter: Maximum number of iterations for the search hann: Turn on Hanning window. Default: :py:obj:`False`. Ignored for interpolated FFT. pad_length: Zero pad the input signal. Rounded to the higher power of 2 Ignored for interpolated FFT use_mp (bool): Flag to activate multiprocessing (default: False) pool_size: number of processes used when *use_mp* is :py:obj:`True`. If None, ``min(npart,nproc)`` is used. It can be globally set using the variable *at.lattice.DConstant.patpass_poolsize* start_method: Python multiprocessing start method. Default None uses the OS default method. remove_mean: Remove the mean of the input signal. Default: :py:obj:`True`. Returns: tunes (ndarray): numpy array of length len(cents), max of the spectrum within [fmin fmax] .. note:: * The tune is defined as the harmonic with the maximum amplitude within the search range ``(fmin, fmax)``. * In case a ``Nan`` is present in the input vector or the tune cannot be found within the range, the function returns ``NaN``. * For the method ``'interp_fft'``, harmonics are calculated iteratively starting from the maximum peak of the raw FFT. ``num_harmonics=1`` is the default, only the first harmonic is calculated. However, it is possible that the maximum of the interpolated FFT does not correspond to the maximum of the raw FFT, in which case ``num_harmonics`` has to be increased to get the correct peak. """ tunes, _, _ = get_main_harmonic(cents, method=method, hann=hann, fmin=fmin, fmax=fmax, pad_length=pad_length, use_mp=use_mp, pool_size=pool_size, start_method=start_method, maxiter=maxiter, remove_mean=remove_mean, num_harmonics=num_harmonics) return tunes