Source code for scintillometry.backend.iterations

"""Copyright 2019-2023 Scintillometry Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=====

Iterative methods to calculate heat fluxes.

IterationMost is descended from an old Python port of a method initially
written in R by Dr. Helen Ward (2019), but has been completely
rewritten - the implementations and features available are fundamentally
different so the results from one should not be assumed identical to the
other.

The current method used by IterationMost is based on mathematical theory
available in:

    - Scintec AG (2022). Scintec Scintillometers Theory Manual,
      Version 1.05. Scintec AG, Rottenburg, Germany. [#scintec2022]_

    - Scintec AG (2008). Scintec Boundary Layer Scintillometer User
      Manual, Version 1.49. Scintec AG, Rottenburg, Germany.
      [#scintec2008]_

This method has been updated to resolve some inconsistencies between
the manuals, and has additional modifications to enhance performance and
broaden its use with different scintillometer models. Results may
therefore differ slightly between this and other iterative methods.

TODO:
    - ST-45: Implement MM5 method (Zhang and Anthes, 1982).
    - ST-46: Implement Li method (Li et al., 2014; 2015).
"""

import time
import warnings

import mpmath
import numpy as np
import tqdm

from scintillometry.backend.constants import AtmosConstants


[docs]class IterationMost: """Classic MOST Iteration. A detailed description of the iterative scheme is available in the Scintec Scintillometers Theory Manual, Version 1.05. [#scintec2022]_ This iterative method resolves some inconsistencies between different scintillometer manuals. Results may therefore differ slightly with other third-party implementations. Attributes: constants (AtmosConstants): Inherited atmospheric constants. """ def __init__(self): super().__init__() mpmath.dps = 30 mpmath.pretty = True self.constants = AtmosConstants()
[docs] def momentum_stability_unstable(self, obukhov, z): """Integrated stability function for momentum, unstable. Uses formula from Paulson (1970) [#paulson1970]_. Apply when |LOb| < 0. Args: obukhov (float): Obukhov length, |LOb| [m]. z (float): Height, |z| [m]. Returns: mpmath.mpf: Integrated stability function for momentum, |Psi_m| [0-Dim]. """ x = (1 - 16 * (z / obukhov)) ** (1 / 4) psi_momentum = ( 2 * mpmath.ln((1 + x) / 2) + mpmath.ln((1 + x**2) / 2) - 2 * mpmath.atan(x) + mpmath.pi / 2 ) return psi_momentum
[docs] def momentum_stability_stable(self, obukhov, z): """Integrated stability function for momentum, stable. Apply when |LOb| > 0. Args: obukhov (float): Obukhov length, |LOb| [m]. z (float): Height, |z| [m]. Returns: mpmath.mpf: Integrated stability function for momentum, |Psi_m| [0-Dim]. """ psi_momentum = (-5) * z / obukhov return mpmath.mpmathify(psi_momentum)
[docs] def momentum_stability(self, obukhov, z): """Integrated stability function for momentum. Calculates integrated stability function for momentum under stable or unstable conditions. No implementation for neutral stability. Args: obukhov (float): Obukhov length, |LOb| [m]. z (float): Height, |z| [m]. Returns: mpmath.mpf: Integrated stability function for momentum, |Psi_m| [0-Dim]. """ if obukhov > 0: psi_m = self.momentum_stability_stable(obukhov, z) else: psi_m = self.momentum_stability_unstable(obukhov, z) return psi_m
[docs] def get_most_coefficients(self, most_id="an1988", most_type="ct2"): """Returns MOST coefficients for similarity functions. Implemented MOST coefficients: - **an1988**: E.L. Andreas (1988) [#andreas1988]_ - **li2012**: D. Li et al. (2012) [#li2012]_ - **wy1971**: Wyngaard et al. (1971) [#wyngaard1971]_ - **wy1973**: Wyngaard et al. (1973) in Kooijmans and Hartogensis (2016) [#kooijmans2016]_ - **ma2014**: Maronga et al. (2014) [#maronga2014]_ - **br2014**: Braam et al. (2014) [#braam2014]_ Braam et al. (2014) and Maronga et al. (2014) do not provide coefficients for stable conditions, so gradient functions will evaluate to zero for stable conditions. Args: most_id (str): MOST coefficients for unstable and stable conditions. Default "an1988". most_type (str): MOST function. Default "ct2". Returns: list: MOST coefficients formatted as ``[(unstable, unstable), (stable, stable)]`` Raises: NotImplementedError: MOST coefficients are not implemented for <most_id>. NotImplementedError: MOST coefficients are not implemented for functions of <most_type>. """ if most_type.lower() != "ct2": raise NotImplementedError( f"MOST coefficients are not implemented for functions of {most_type}." ) elif most_id.lower() not in self.constants.most_coeffs_ft: raise NotImplementedError( f"MOST coefficients are not implemented for {most_id}." ) else: coeffs = self.constants.most_coeffs_ft[most_id.lower()] return coeffs
[docs] def similarity_function(self, obukhov, z, coeffs, stable): """Similarity function of |CT2|. Adapted from Wyngaard et al. (1971) [#wyngaard1971]_; Thiermann and Grassl (1992) [#thiermann1992]_; Kooijmans and Hartogensis (2016) [#kooijmans2016]_. Args: obukhov (float): Obukhov length, |LOb| [m]. z (float): Effective height, |z| [m]. stable (bool): True for stable conditions, otherwise False. coeffs (list): MOST coefficients for unstable and stable conditions. Returns: float: Similarity function of |CT2|, |f_CT2|. """ if obukhov == 0: # otherwise zero division obukhov += 1e-10 # Standard shape from Kooijmans and Hartogensis (2016) if not stable: f_ct2 = coeffs[0][0] * (1 - coeffs[0][1] * (z / obukhov)) ** (-2 / 3) else: f_ct2 = coeffs[1][0] * (1 + coeffs[1][1] * ((z / obukhov) ** (2 / 3))) return f_ct2
[docs] def calc_theta_star(self, ct2, f_ct2, z, stable): """Calculate temperature scale. Args: ct2 (float): Structure parameter of temperature, |CT2| [|K^2m^-2/3|]. f_ct2 (float): MOST function of |CT2|, |f_CT2|. z (float): Effective height, |z| [m]. stable (bool): True for stable conditions, otherwise False. Returns: mpmath.mpf: Temperature scale, |theta*|. """ theta_star = mpmath.sqrt(ct2 * (z ** (2 / 3)) / f_ct2) if not stable: theta_star = -theta_star return theta_star
[docs] def calc_u_star(self, u, z_u, r_length, o_length): """Calculates friction velocity. Args: u (float): Wind speed, |u| [|ms^-1|]. z_u (float): Height of wind speed measurement including displacement (z-d), |z_u| [m]. r_length (float): Roughness length, |z_0| [m]. o_length (float): Obukhov length |LOb| [m]. Returns: mpmath.mpf: Friction velocity |u*|. """ friction_velocity = ( self.constants.k * u / ( mpmath.ln(z_u / r_length) - self.momentum_stability(o_length, z=z_u) + self.momentum_stability(o_length, z=r_length) ) ) return friction_velocity
[docs] def calc_obukhov_length(self, temp, u_star, theta_star): """Calculate Obukhov length. Args: temp (np.floating): Air temperature in Kelvin, |T| [K]. u_star (mpmath.mpf): Friction velocity |u*|. theta_star (mpmath.mpf): Temperature scale, |theta*|. Returns: mpmath.mpf: Obukhov Length, |LOb| [m]. """ obukhov_length = (temp * (u_star**2)) / ( self.constants.g * self.constants.k * theta_star ) return obukhov_length
[docs] def check_signs(self, stable_flag, dataframe): """Warns if sign of variable doesn't match expected sign. Args: stable_flag (bool): True for stable conditions, otherwise False. dataframe (pd.DataFrame): Dataframe with sensible heat flux and Obukhov lengths. Warns: UserWarning: Sign of Obukhov lengths should be opposite of SHFs. UserWarning: SHFs should never be positive for stable conditions. UserWarning: Obukhov lengths should never be negative for stable conditions. """ if stable_flag and (dataframe["shf"] > 0).any(): msg = "SHFs should never be positive for stable conditions." warnings.warn(UserWarning(msg)) if stable_flag and (dataframe["obukhov"] < 0).any(): msg = "Obukhov lengths should never be negative for stable conditions." warnings.warn(UserWarning(msg)) # use "not" with <= to avoid warnings when comparing zeros. if not ((dataframe["shf"] <= 0).any()) == ((dataframe["obukhov"] >= 0).any()): msg = "Sign of Obukhov lengths should be opposite of SHFs." warnings.warn(UserWarning(msg))
[docs] def most_iteration(self, dataframe, zm_bls, stable_flag, most_coeffs): """Iterative MOST method. A detailed description of the iterative method is available in: - Scintec AG (2022). *Scintec Scintillometers Theory Manual*, Version 1.05. Scintec AG, Rottenburg, Germany. [#scintec2022]_ Iteration until convergence is slower than vectorisation, but more accurate. If a value never converges, the iteration stops after ten runs. Args: dataframe (pd.DataFrame): Parsed, localised dataframe row containing at least |CT2|, wind speed, air density, and temperature. zm_bls (float): Effective height of scintillometer. stable_flag (bool): Stability conditions. If true, assumes stable conditions, otherwise assumes unstable conditions. most_coeffs (list): MOST coefficients for unstable and stable conditions. Returns: pd.DataFrame: Dataframe with additional columns for Obukhov lengths, sensible heat fluxes, frictional velocity, and temperature scale. """ z0 = zm_bls * 0.1 # estimated roughness length iter_step = 0 # enforce limit if no convergence obukhov_diff = 1 # converges around 5 iterations, but 10 is safer while not (np.abs(obukhov_diff) < 0.1 or iter_step >= 10): dataframe["f_ct2"] = self.similarity_function( obukhov=dataframe["obukhov"], z=zm_bls, stable=stable_flag, coeffs=most_coeffs, ) dataframe["u_star"] = self.calc_u_star( u=dataframe["wind_speed"], z_u=zm_bls, r_length=z0, o_length=dataframe["obukhov"], ) dataframe["theta_star"] = self.calc_theta_star( ct2=dataframe["CT2"], f_ct2=dataframe["f_ct2"], z=zm_bls, stable=stable_flag, ) obukhov_tmp = self.calc_obukhov_length( temp=dataframe["temperature_2m"], u_star=dataframe["u_star"], theta_star=dataframe["theta_star"], ) # Use difference to determine convergence obukhov_diff = np.abs(obukhov_tmp - dataframe["obukhov"]) dataframe["obukhov"] = obukhov_tmp if mpmath.isnan(dataframe["obukhov"]): # iteration unnecessary if NaN iter_step = 10 iter_step += 1 # Calculate SHF after iteration dataframe["shf"] = ( (-1) * dataframe["rho_air"] * self.constants.cp * dataframe["u_star"] * dataframe["theta_star"] ) if stable_flag: dataframe["shf"] = -mpmath.fabs(dataframe["shf"]) return dataframe
[docs] def most_method(self, dataframe, eff_h, stability, coeff_id="an1988"): """Calculate Obukhov length and sensible heat fluxes with MOST. Iteration is more accurate but slower than vectorisation. Implemented MOST coefficients: - **an1988**: E.L. Andreas (1988) [#andreas1988]_ - **li2012**: D. Li et al. (2012) [#li2012]_ - **wy1971**: Wyngaard et al. (1971) [#wyngaard1971]_ - **wy1973**: Wyngaard et al. (1973) in Kooijmans and Hartogensis (2016) [#kooijmans2016]_ - **ma2014**: Maronga et al. (2014) [#maronga2014]_ - **br2014**: Braam et al. (2014) [#braam2014]_ Braam et al. (2014) and Maronga et al. (2014) do not provide coefficients for stable conditions, so gradient functions will evaluate to zero for stable conditions. Args: dataframe (pd.DataFrame): Parsed, localised dataframe containing at least |CT2|, wind speed, air density, and temperature. eff_h (float): Effective path height. stability (str): Stability conditions. Either "stable" or "unstable". coeff_id (str): ID of MOST coefficients for unstable and stable conditions. Default "an1988". Returns: pd.DataFrame: Dataframe with additional columns for Obukhov lengths, sensible heat fluxes, frictional velocity, and temperature scale. """ coeffs = self.get_most_coefficients(most_id=coeff_id, most_type="ct2") # Prepare dataframe iteration = dataframe.copy(deep=True) if stability == "unstable": iteration["obukhov"] = -100 # unstable conditions stable = False else: iteration["obukhov"] = 200 # stable conditions stable = True print(f"Started iteration ({stability})...") start = time.time() tqdm.tqdm.pandas() # register pd.Series.map_apply with tqdm iteration = iteration.progress_apply( lambda x: self.most_iteration( x, zm_bls=eff_h, stable_flag=stable, most_coeffs=coeffs ), axis=1, ) end = time.time() self.check_signs(stable_flag=stable, dataframe=iteration) print(f"Completed iteration ({stability}) in {end - start:>0.2f}s.\n") return iteration