Source code for scintillometry.metrics.calculations

"""Copyright 2023 Scintillometry Contributors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

=====

Calculates metrics from datasets.
"""

import kneed
import pandas as pd
from sklearn.linear_model import LinearRegression

from scintillometry.backend.constants import AtmosConstants
from scintillometry.backend.constructions import ProfileConstructor
from scintillometry.backend.derivations import DeriveScintillometer
from scintillometry.backend.iterations import IterationMost
from scintillometry.backend.transects import TransectParameters
from scintillometry.visuals.plotting import FigurePlotter
from scintillometry.backend.deprecations import Decorators


[docs]class MetricsTopography: """Calculate metrics for topographical data. Attributes: transect (TransectParameters): Inherited methods for calculating path heights. """ def __init__(self): super().__init__() self.transect = TransectParameters()
[docs] def get_path_height_parameters(self, transect, regime=None): """Get effective and mean path heights of transect. Computes effective and mean path heights of transect under stable and unstable conditions, and with no height dependency. Prints the effective and mean path height for the user-selected stability conditions. Select the stability conditions using ``--r, --regime <str>``. Args: transect (pd.DataFrame): Parsed path transect data. regime (str): Target stability condition. Default None. Returns: dict[str, tuple[np.floating, np.floating]]: Tuples of effective and mean path height |z_eff| and |z_mean| [m], with stability conditions as keys. """ z_params = self.transect.get_all_path_heights(path_transect=transect) self.transect.print_path_heights( z_eff=z_params[str(regime)][0], z_mean=z_params[str(regime)][1], stability=regime, ) return z_params
[docs]class MetricsFlux: """Calculate metrics for fluxes. Attributes: constants (AtmosConstants): Inherited atmospheric constants. plotting (FigurePlotter): Inherited methods for plotting figures. derivation (DeriveScintillometer): Inherited methods for deriving parameters and fluxes from scintillometer. construction (ProfileConstructor): Inherited methods for constructing vertical profiles. iteration (IterationMost): Inherited methods for MOST iterative method. """ def __init__(self): super().__init__() self.constants = AtmosConstants() self.plotting = FigurePlotter() self.derivation = DeriveScintillometer() self.construction = ProfileConstructor() self.iteration = IterationMost()
[docs] def construct_flux_dataframe( self, interpolated_data, z_eff, beam_wavelength=880, beam_error=20 ): """Compute sensible heat flux for free convection. Computes sensible heat flux for free convection, plots a comparison between the computed flux and the flux recorded by the scintillometer, and saves the figure to disk. Warning: this will overwrite existing values for |CT2| in <interpolated_data>. Args: interpolated_data (pd.DataFrame): Dataframe containing parsed and localised weather and scintillometer data with matching temporal resolution. z_eff (np.floating): Effective path height, z_eff| [m]. beam_wavelength (int): Transmitter beam wavelength, nm. Default 880 nm. beam_error (int): Transmitter beam error, nm. Default 20 nm. Keyword Args: beam_wavelength (int): Transmitter beam wavelength, nm. Default 880 nm. beam_error (int): Transmitter beam error, nm. Default 20 nm. Returns: pd.DataFrame: Interpolated dataframe with additional column for sensible heat flux under free convection, and derived values for |CT2| [|K^2m^-2/3|]. """ flux_data = self.derivation.compute_fluxes( input_data=interpolated_data, effective_height=z_eff[0], beam_params=(beam_wavelength, beam_error), ) return flux_data
[docs] def get_nearest_time_index(self, data, time_stamp): """Get timestamp of dataframe index nearest to input timestamp. Args: data (pd.DataFrame): Dataframe with DatetimeIndex. time_stamp (pd.Timestamp): Time of day. Returns: pd.Timestamp: Closest index to time stamp. """ nearest_index = data.index.get_indexer([time_stamp], method="nearest", limit=1) loc_idx = data.index[nearest_index] return loc_idx[0]
[docs] def append_vertical_variables(self, data): """Derives vertical measurements and appends them to input. For several days or more of data, the returned dictionary may be quite large. Args: data (dict): Must at least contain the keys "vertical" and "weather", where "vertical" is a dictionary of vertical measurements for temperature and humidity:: data = { "weather": weather_data, "timestamp": bls_time, "vertical": { "temperature": pd.DataFrame, "humidity": pd.DataFrame, } } Returns: dict: If the input dictionary contained a dictionary under the key "vertical" measurements, the dictionary under "vertical" is updated with vertical data for water vapour pressure, air pressure, mixing ratio, virtual temperature, mean sea-level pressure, and potential temperature. Otherwise, the dictionary is returned unmodified. """ if "vertical" in data: data["vertical"] = self.construction.get_vertical_variables( vertical_data=data["vertical"], meteo_data=data["weather"] ) return data
[docs] def match_time_at_threshold(self, series, threshold, lessthan=True, min_time=None): """Gets time index of first value in series exceeding threshold. Args: series (pd.Series): Time series of numeric variable. threshold (float): Threshold value. lessthan (bool): If True, finds first value less than threshold. Otherwise, returns first value greater than threshold. Default True. min_time (pd.Timestamp): Time indices below this threshold are discarded. Default None. Returns: pd.Timestamp: Local time of the first value in the series that is less than the given threshold. """ if min_time: series = series[series.index >= min_time] if lessthan: series_match = series[series.lt(threshold)] else: series_match = series[series.gt(threshold)] if not series_match.empty: match_time = series_match.dropna().index[0] else: match_time = None return match_time
[docs] def get_regression(self, x_data, y_data, intercept=True): """Performs regression on labelled data. Args: x_data (pd.Series): Labelled explanatory data. y_data (pd.Series): Labelled response data. intercept (bool): If True, calculate intercept (e.g. data is not centred). Default True. Returns: dict: Contains the fitted estimator for regression data, the coefficient of determination |R^2|, and predicted values for a fitted regression line. """ scatter_frame = pd.merge( x_data, y_data, left_index=True, right_index=True, sort=True ) scatter_frame = scatter_frame.dropna(axis=0) x_fit_data = scatter_frame.iloc[:, 0].values.reshape(-1, 1) y_fit_data = scatter_frame.iloc[:, 1].values.reshape(-1, 1) linear_regressor = LinearRegression(fit_intercept=intercept) estimator = linear_regressor.fit(x_fit_data, y_fit_data) score = estimator.score(x_fit_data, y_fit_data) predictions = linear_regressor.predict(x_fit_data) regression = { "fit": estimator, "score": score, "regression_line": predictions, } return regression
[docs] def get_elbow_point(self, series, min_index=None, max_index=None): """Calculate elbow point using Kneedle algorithm. Only supports convex curves. Noisier curves may have several elbow points, in which case the function selects the smallest acceptable index. Args: series (pd.Series): Numeric data following a convex curve. min_index (Any): Indices below this threshold are discarded. Default None. max_index (Any): Indices above this threshold are discarded. Default None. Returns: int: Integer index of elbow point. Returns `None` if no elbow point is found. """ if not max_index and not min_index: indices = series.index else: if not max_index: max_index = series.index[-1] if not min_index: min_index = series.index[0] indices = series.index[ (series.index >= min_index) & (series.index <= max_index) ] if series[indices[-1]] < series[indices[0]]: curve_direction = "decreasing" online_param = True else: curve_direction = "increasing" online_param = True knee = kneed.KneeLocator( series[indices], indices, S=1.5, curve="convex", online=online_param, direction=curve_direction, interp_method="interp1d", ) elbows = series[knee.all_elbows_y][ series[knee.all_elbows_y] < series[indices].mean() ] if not elbows.empty: elbow_index = min(elbows.index) elif knee.all_elbows_y: elbow_index = min(knee.all_elbows_y) else: elbow_index = None return elbow_index
[docs] def get_boundary_height(self, grad_potential, time_index, max_height=2000): """Estimate height of boundary layer from potential temperature. Estimates the height of the boundary layer by calculating the elbow point of the gradient potential temperature using the Kneedle algorithm, i.e. where the gradient potential temperature starts to weaken. It is not a substitute for visual inspection. Args: grad_potential (pd.DataFrame): Vertical measurements, gradient potential temperature, |Dtheta/Dz| [|Km^-1|]. time_index (pd.Timestamp): Local time at which to estimate boundary layer height. max_height (int): Cutoff height for estimating boundary layer height. Default 2000. Returns: int: Estimated boundary layer height, |z_BL| [m]. """ time_index = self.get_nearest_time_index( data=grad_potential, time_stamp=time_index ) curve = grad_potential.loc[time_index] # set minimum height to 50m to avoid discontinuity errors elbow = self.get_elbow_point(series=curve, min_index=50, max_index=max_height) if not elbow: bl_height = None elif elbow >= max_height: bl_height = None else: bl_height = elbow if not bl_height: print("Failed to estimate boundary layer height.") else: print(f"Estimated boundary layer height: {bl_height} m.") return bl_height
[docs] def compare_lapse_rates(self, air_temperature, saturated, unsaturated): """Compares parcel temperatures to find instability. Args: air_temperature (pd.DataFrame): Vertical measurements, temperature, |T| [K]. saturated (pd.DataFrame): Vertical measurements, saturated parcel temperature, |T_sat| [K]. unsaturated (pd.DataFrame): Vertical measurements, unsaturated parcel temperature, |T_unsat| [K]. Returns: tuple[pd.Series[bool], pd.Series[bool]]: Boolean series of absolute and conditional instability for heights |z|. """ heights = air_temperature.columns[ (air_temperature.columns > 0) & (air_temperature.columns <= 2000) ] absolute_instability = ( air_temperature[heights].gt(unsaturated[heights]).any(axis=1) ) conditional_instability = ( air_temperature[heights].lt(saturated[heights]).any(axis=1) & air_temperature[heights].gt(saturated[heights]).any(axis=1) ) | air_temperature[heights].gt(unsaturated[heights]).any(axis=1) return absolute_instability, conditional_instability
[docs] def plot_lapse_rates( self, vertical_data, dry_adiabat, local_time, bl_height=None, location="" ): """Plots comparison of lapse rates and boundary layer height. The figures are saved to disk. Args: vertical_data (dict): Vertical measurements for lapse rates and temperatures. dry_adiabat (float): Dry adiabatic lapse rate. local_time (pd.Timestamp): Local time of switch between stability conditions. bl_height (float): Boundary layer height, [m]. Default None. location (str): Location of data collection. Default empty string. Returns: list[tuple[plt.Figure, plt.Axes]]: Vertical profiles of lapse rates on a single axis, and vertical profiles of parcel temperatures on a single axis. If a boundary layer height is provided, vertical lines denoting its height are added to the figures. """ lapse_rates = { "environmental_lapse_rate": vertical_data["environmental_lapse_rate"], "moist_adiabatic_lapse_rate": vertical_data["moist_adiabatic_lapse_rate"], } round_time = self.get_nearest_time_index( data=next(iter(lapse_rates.values())), time_stamp=local_time ) fig_lapse, axes_lapse = self.plotting.plot_merged_profiles( dataset=lapse_rates, time_index=round_time, vlines={"dry_adiabatic_lapse_rate": dry_adiabat}, hlines={"boundary_layer_height": bl_height}, site=location, y_lim=2000, title="Temperature Lapse Rates", x_label=r"Lapse Rate, [Km$^{-1}$]", ) self.plotting.save_figure( figure=fig_lapse, timestamp=round_time, suffix=f"{round_time.strftime('%H%M')}_lapse_rates", ) parcel_temperatures = { "temperature": vertical_data["temperature"], "unsaturated_temperature": vertical_data["unsaturated_temperature"], "saturated_temperature": vertical_data["saturated_temperature"], } fig_parcel, axes_parcel = self.plotting.plot_merged_profiles( dataset=parcel_temperatures, time_index=round_time, hlines={"boundary_layer_height": bl_height}, site=location, y_lim=2000, title="Vertical Profiles of Parcel Temperature", x_label="Temperature, [K]", ) self.plotting.save_figure( figure=fig_parcel, timestamp=round_time, suffix=f"{round_time.strftime('%H%M')}_parcel_temperatures", ) return [(fig_lapse, axes_lapse), (fig_parcel, axes_parcel)]
[docs] def get_switch_time_vertical(self, data, method="static", ri_crit=0.25): """Gets local time of switch between stability conditions. Pass one of the following to the <method> argument: - **eddy**: eddy covariance (NotImplemented) - **static**: static stability from potential temperature profile - **bulk**: bulk Richardson number - **lapse**: temperature lapse rate - **brunt**: Brunt-Väisälä frequency (NotImplemented) Args: data (dict): Parsed and localised dataframes, containing vertical measurements to construct a potential temperature profile or calculate lapse rates. method (str): Method to calculate switch time. Default "static". ri_crit (float): Critical bulk Richardson number for CBL. Only used if `method = "bulk"`. For street canyons consider values between 0.5 - 1.0 [#zhao2020]_. Default 0.25 [#jericevic2006]_. Returns: pd.Timestamp: Local time of switch between stability conditions. Raises: NotImplementedError: Switch time algorithm not implemented for <method>. """ # static stability if method == "static" and "grad_potential_temperature" in data["vertical"]: heights = data["vertical"]["grad_potential_temperature"].columns[ data["vertical"]["grad_potential_temperature"].columns <= 2000 ] negative_grad = ( data["vertical"]["grad_potential_temperature"][heights] .lt(0) .any(axis=1) ) local_time = self.match_time_at_threshold( series=negative_grad, # since True == 1 threshold=0, lessthan=False, min_time=data["timestamp"], ) elif method == "bulk": # bulk richardson bulk_richardson = self.construction.get_bulk_richardson( potential_temperature=data["vertical"]["potential_temperature"], meteo_data=data["weather"], ) local_time = self.match_time_at_threshold( series=bulk_richardson, threshold=ri_crit, lessthan=True ) data["vertical"]["bulk_richardson"] = bulk_richardson elif method == "lapse": # lapse rates _, conditional_instability = self.compare_lapse_rates( air_temperature=data["vertical"]["temperature"], saturated=data["vertical"]["saturated_temperature"].dropna(), unsaturated=data["vertical"]["unsaturated_temperature"].dropna(), ) local_time = self.match_time_at_threshold( series=conditional_instability, threshold=0, lessthan=False, min_time=data["timestamp"], ) else: error_msg = f"Switch time algorithm not implemented for '{method}'." raise NotImplementedError(error_msg) return local_time
[docs] def get_switch_time(self, data, method="sun", local_time=None, ri_crit=0.25): """Gets local time of switch between stability conditions. To override automatic detection, pass one of the following to the <method> argument: - **eddy**: eddy covariance (NotImplemented) - **sun**: global irradiance (i.e. sunrise) - **static**: static stability from potential temperature profile - **bulk**: bulk Richardson number - **lapse**: temperature lapse rate - **brunt**: Brunt-Väisälä frequency (NotImplemented) To manually set the regime switch time, pass a localised timestamp or string to <local_time>. This overrides all other methods. Args: data (dict): Parsed and localised dataframes, containing data to construct a potential temperature profile, or eddy covariance data, or global irradiance. method (str): Method to calculate switch time. Default "sun". local_time (Union[str, pd.Timestamp]): Local time of switch between stability conditions. Overrides <method>. Default None. ri_crit (float): Critical bulk Richardson number for CBL. Only used if `method = "bulk"`. For street canyons consider values between 0.5 - 1.0 [#zhao2020]_. Default 0.25 [#jericevic2006]_. Returns: pd.Timestamp: Local time of switch between stability conditions. Raises: UnboundLocalError: No data to calculate switch time. Set <local_time> manually with `--switch-time`. """ if not local_time: weather_data = data["weather"] if isinstance(method, str): method = method.lower() if method == "sun" and "global_irradiance" in weather_data.keys(): print("Using global irradiance to calculate switch time.\n") local_time = self.match_time_at_threshold( series=weather_data["global_irradiance"], threshold=20, # ~sunrise lessthan=False, ) elif "vertical" in data: local_time = self.get_switch_time_vertical( data=data, method=method, ri_crit=ri_crit ) if not local_time: error_msg = ( "No data to calculate switch time.", "Set <local_time> manually with `--switch-time`.", ) raise UnboundLocalError(" ".join(error_msg)) elif isinstance(local_time, str): split_times = local_time.split(sep=":") start_time = data["timestamp"] local_time = start_time.replace( hour=int(split_times[0]), minute=int(split_times[1]) ) print(f"Stability conditions change at: {local_time.strftime('%H:%M %Z')}") return local_time
[docs] def plot_switch_time_stability(self, data, local_time, location="", bl_height=None): """Plot and save profiles of potential temperature and gradient. Args: data (dict): Contains dataframes for vertical profiles of potential temperature and optionally the gradient of potential temperature. local_time (pd.Timestamp): Local time of switch between stability conditions. location (str): Location of data collection. Default empty string. bl_height (int): Boundary layer height. Default None. Returns: list[tuple[plt.Figure, plt.Axes]]: Vertical profile of potential temperature. If the gradient potential temperature is also provided, the two vertical profiles are placed side-by-side in separate subplots. """ round_time = self.get_nearest_time_index( data=data["potential_temperature"], time_stamp=local_time ) mil_time = round_time.strftime("%H%M") fig, ax = self.plotting.plot_vertical_profile( vertical_data=data, name="potential_temperature", time_idx=round_time, site=location, y_lim=2000, hlines={"boundary_layer_height": bl_height}, ) self.plotting.save_figure( figure=fig, timestamp=local_time, suffix=f"{mil_time}_potential_temperature_2km", ) if "grad_potential_temperature" in data: fig, ax = self.plotting.plot_vertical_comparison( dataset=data, time_index=round_time, keys=["potential_temperature", "grad_potential_temperature"], site=location, hlines={"boundary_layer_height": bl_height}, ) self.plotting.save_figure( figure=fig, timestamp=local_time, suffix=f"{mil_time}_potential_temperature_profiles", ) fig_grad, _ = self.plotting.plot_vertical_profile( vertical_data=data, name="grad_potential_temperature", time_idx=round_time, site=location, y_lim=2000, hlines={"boundary_layer_height": bl_height}, ) self.plotting.save_figure( figure=fig_grad, timestamp=local_time, suffix=f"{mil_time}_gradient_potential_temperature_2km", ) return [(fig, ax)]
[docs] def calculate_switch_time( self, datasets, method="sun", switch_time=None, location="" ): """Calculates and plots local time of switch in stability. Optionally uses vertical measurements in a dictionary under `datasets["vertical"]` to plot potential temperature profiles. Args: datasets (dict): Parsed and localised dataframes, containing data to construct a potential temperature profile, or eddy covariance data, or global irradiance. method (str): Method to calculate switch time. Default "sun". switch_time (Union[str, pd.Timestamp]): Local time of switch between stability conditions. Overrides <method>. Default None. location (str): Location of data collection. Default empty string. Returns: pd.Timestamp: Local time of switch between stability conditions. """ switch_time = self.get_switch_time( data=datasets, method=method, local_time=switch_time ) if "vertical" in datasets: vertical_data = datasets["vertical"] if "grad_potential_temperature" in vertical_data: layer_height = self.get_boundary_height( grad_potential=vertical_data["grad_potential_temperature"], time_index=switch_time, max_height=2000, ) self.plot_switch_time_stability( data=vertical_data, local_time=switch_time, location=location, bl_height=layer_height, ) if "environmental_lapse_rate" in vertical_data: self.plot_lapse_rates( vertical_data=datasets["vertical"], dry_adiabat=self.constants.dalr, bl_height=layer_height, local_time=switch_time, location=location, ) return switch_time
[docs] def iterate_fluxes( self, z_parameters, datasets, most_id="an1988", algorithm="sun", switch_time=None, location="", ): """Compute sensible heat fluxes with MOST through iteration. Trades speed from vectorisation for more accurate convergence. Args: z_parameters (dict[str, tuple[float, float]): Tuples of effective and mean path height |z_eff| and |z_mean| [m], with stability conditions as keys. datasets (dict): Contains parsed, tz-aware dataframes, with at least |CT2|, wind speed, air density, and temperature. most_id (str): MOST coefficients for unstable and stable conditions. Default "an1988". algorithm (str): Method to calculate switch time. Default "sun". switch_time (Union[str, pd.Timestamp]): Local time of switch between stability conditions. Overrides <method>. Default None. location (str): Location of data collection. Default empty string. Returns: pd.DataFrame: Interpolated data with additional columns for Obukhov length |LOb|, sensible heat flux |H|, friction velocity |u*|, and temperature scale |theta*|. """ interpolated_data = datasets["interpolated"] switch_time = self.calculate_switch_time( datasets=datasets, method=algorithm, switch_time=switch_time, location=location, ) round_time = self.get_nearest_time_index( data=interpolated_data, time_stamp=switch_time ).strftime("%H:%M") iteration_stable = interpolated_data.iloc[ interpolated_data.index.indexer_between_time("00:00", round_time) ].copy(deep=True) iteration_stable = self.iteration.most_method( dataframe=iteration_stable, eff_h=z_parameters["stable"][0], stability="stable", coeff_id=most_id, ) iteration_unstable = interpolated_data.iloc[ interpolated_data.index.indexer_between_time(round_time, "23:59") ].copy(deep=True) iteration_unstable = self.iteration.most_method( dataframe=iteration_unstable, eff_h=z_parameters["unstable"][0], stability="unstable", coeff_id=most_id, ) iteration_data = pd.concat([iteration_stable, iteration_unstable], sort=True) return iteration_data
[docs] def plot_derived_metrics(self, derived_data, time_id, regime=None, location=""): """Plot and save derived fluxes. Args: derived_data (pd.DataFrame): Interpolated tz-aware dataframe with column for sensible heat flux under free convection. time_id (pd.Timestamp): Start time of scintillometer data collection. regime (str): Stability condition. Default None. location (str): Location of data collection. Default empty string. Returns: list[tuple[plt.Figure, plt.Axes]]: Time series comparing sensible heat fluxes under free convection to on-board software. """ fig_convection, ax_convection = self.plotting.plot_convection( dataframe=derived_data, stability=regime, site=location ) self.plotting.save_figure( figure=fig_convection, timestamp=time_id, suffix="free_convection" ) derived_plots = [(fig_convection, ax_convection)] return derived_plots
[docs] @Decorators.deprecated_argument( stage="pending", version="1.0.5", site_location="location" ) def plot_iterated_metrics(self, iterated_data, time_stamp, location=""): """Plots and saves iterated SHF, comparison to free convection. .. todo:: ST-126: Deprecate the argument `site_location` for `location`. Args: iterated_data (pd.DataFrame): TZ-aware with columns for sensible heat fluxes calculated for free convection |H_free|, and by MOST |H|. time_stamp (pd.Timestamp): Local time of data collection. location (str): Location of data collection. Default empty string. Returns: list[tuple[plt.Figure, plt.Axes]]: Time series of sensible heat flux calculated through MOST, and a comparison to sensible heat flux under free convection. """ shf_plot = self.plotting.plot_generic(iterated_data, "shf", site=location) self.plotting.save_figure( figure=shf_plot[0], timestamp=time_stamp, suffix="shf" ) comparison_plot = self.plotting.plot_comparison( df_01=iterated_data, df_02=iterated_data, keys=["H_free", "shf"], labels=["Free Convection", "Iteration"], site=location, ) self.plotting.save_figure( figure=comparison_plot[0], timestamp=time_stamp, suffix="shf_comp" ) return [shf_plot, comparison_plot]
[docs]class MetricsWorkflow(MetricsFlux, MetricsTopography): """Standard workflow for metrics.""" def __init__(self): super().__init__()
[docs] def calculate_standard_metrics( self, data, regime=None, most_name="an1988", method="sun", switch_time=None, location="", beam_wavelength=880, beam_error=20, **kwargs, ): """Calculates and plots metrics from wrangled data. This wrapper function: - Calculates effective path heights for all stability conditions. - Derives |CT2| and sensible heat flux for free convection. - Estimates the time when stability conditions change. - Calculates sensible heat flux using MOST. - Plots time series comparing sensible heat flux for free convection |H_free| to on-board software, time series of sensible heat flux calculated with MOST |H|, and a comparison to sensible heat flux for free convection. - Saves plots to disk. If this function is imported as a package, mock user arguments with an argparse.Namespace object. Args: data (dict): Contains BLS, weather, and transect dataframes, an interpolated dataframe at 60s resolution containing merged BLS and weather data, a pd.Timestamp object of the scintillometer's recorded start time, and optionally vertical measurements:: data = { "bls": bls_data, "weather": weather_data, "transect": transect_data, "interpolated": interpolated_data, "timestamp": bls_time, "vertical": { "temperature": pd.DataFrame, "humidity": pd.DataFrame, } } regime (str): Target stability condition. Default None. most_name (str): MOST coefficients for unstable and stable conditions. Default "an1988". method (str): Method to calculate switch time. Default "sun". switch_time (Union[str, pd.Timestamp]): Local time of switch between stability conditions. Overrides <method>. Default None. location (str): Location of data collection. Default empty string. beam_wavelength (int): Transmitter beam wavelength, nm. Default 880 nm. beam_error (int): Transmitter beam error, nm. Default 20 nm. Returns: dict: Input dictionary with additional keys "derivation", "iteration" for derived and iterated data, respectively. """ data_timestamp = data["timestamp"] z_params = self.get_path_height_parameters( transect=data["transect"], regime=regime ) # Compute free convection derived_dataframe = self.construct_flux_dataframe( interpolated_data=data["interpolated"], z_eff=z_params["None"], beam_wavelength=beam_wavelength, beam_error=beam_error, ) self.plot_derived_metrics( derived_data=derived_dataframe, time_id=data_timestamp, location=location, ) if "vertical" in data: data = self.append_vertical_variables(data=data) # Compute fluxes through iteration iterated_dataframe = self.iterate_fluxes( z_parameters=z_params, datasets=data, most_id=most_name, algorithm=method, switch_time=switch_time, location=location, ) self.plot_iterated_metrics( iterated_data=iterated_dataframe, time_stamp=data_timestamp, location=location, ) data["derivation"] = derived_dataframe data["iteration"] = iterated_dataframe return data
[docs] def compare_innflux(self, own_data, innflux_data, location=""): """Compares SHF and Obukhov lengths to innFLUX measurements. This wrapper function: - Plots time series comparing Obukhov lengths and sensible heat fluxes between an input dataframe and innFLUX measurements. - Saves plots to disk. If this function is imported as a package, mock user arguments with an argparse.Namespace object. Args: own_data (pd.DataFrame): Labelled data for SHF and Obukhov length. innflux_data (pd.DataFrame): Eddy covariance data from innFLUX. location (str): Location of data collection. Default empty string. Returns: list[tuple[plt.Figure, plt.Axes]]: Time series comparing Obukhov length and sensible heat flux to innFLUX measurements. """ data_timestamp = own_data.index[0] obukhov_plot = self.plotting.plot_innflux( iter_data=own_data, innflux_data=innflux_data, name="obukhov", site=location, ) self.plotting.save_figure( figure=obukhov_plot[0], timestamp=data_timestamp, suffix="innflux_obukhov" ) obukhov_regression = self.get_regression( x_data=own_data["obukhov"], y_data=innflux_data["obukhov"], intercept=True ) obukhov_regression_plot = self.plotting.plot_scatter( x_data=own_data["obukhov"], y_data=innflux_data["obukhov"], sources=["MOST Iteration", "innFLUX"], name="obukhov", score=obukhov_regression["score"], regression_line=obukhov_regression["regression_line"], site=location, ) self.plotting.save_figure( figure=obukhov_regression_plot[0], timestamp=data_timestamp, suffix="innflux_obukhov_regression", ) shf_plot = self.plotting.plot_innflux( iter_data=own_data, innflux_data=innflux_data, name="shf", site=location, ) self.plotting.save_figure( figure=shf_plot[0], timestamp=data_timestamp, suffix="innflux_shf" ) shf_regression = self.get_regression( x_data=own_data["obukhov"], y_data=innflux_data["obukhov"], intercept=True ) shf_regression_plot = self.plotting.plot_scatter( x_data=own_data["shf"], y_data=innflux_data["shf"], sources=["MOST Iteration", "innFLUX"], name="shf", score=shf_regression["score"], regression_line=shf_regression["regression_line"], site=location, ) self.plotting.save_figure( figure=shf_regression_plot[0], timestamp=data_timestamp, suffix="innflux_shf_regression", ) plots = [obukhov_plot, shf_plot, obukhov_regression_plot, shf_regression_plot] return plots
[docs] def compare_eddy(self, own_data, ext_data, source="innflux", location=""): """Compares data to an external source of eddy covariance data. Plots and saves time series comparing Obukhov lengths and sensible heat fluxes between an input dataframe and external eddy covariance measurements. If this function is imported as a package, mock user arguments with an argparse.Namespace object. Args: own_data (pd.DataFrame): Labelled data. ext_data (pd.DataFrame): Eddy covariance data from an external source. source (str): Data source of vertical measurements. Currently supports processed innFLUX data. Default "innflux". location (str): Location of data collection. Default empty string. Returns: list[tuple[plt.Figure, plt.Axes]]: Time series comparing Obukhov length and sensible heat flux to innFLUX measurements. Raises: NotImplementedError: <source> measurements are not supported. Use "innflux". """ if source.lower() == "innflux": eddy_plots = self.compare_innflux( own_data=own_data, innflux_data=ext_data, location=location ) else: error_msg = ( f"{source.title()} measurements are not supported. Use 'innflux'." ) raise NotImplementedError(error_msg) return eddy_plots