Source code for airbornegeo.nav

import math

import geopandas as gpd
import numpy as np
import pandas as pd
import seaborn as sns
import shapely
from geographiclib.geodesic import Geodesic
from numpy.typing import NDArray
from shapely.geometry import LineString
from tqdm.autonotebook import tqdm

sns.set_theme()


def directional_velocity(
    data: pd.DataFrame,
    *,
    time_column: str = "unixtime",
    coordinate_column: str = "easting",
    groupby_column: str | None = None,
) -> pd.Series:
    """
    Calculate one component of velocity, which is the change in coordinate values
    divided by the change in time between each successive row in the dataframe. For
    example, if latitude in decimal degrees are provided via column `coordinate_column`
    and time in seconds is provided to `time_column`, this would return the latitudinal
    component of velocity in degrees per second. This assumes the data have been sorted
    by time, and that there are not flights which overlap in time. If there are, you can
    first sort by flight, then by time. For example, if you have columns 'flight' and
    'unixtime', you can accomplish this with
    `data = data.sort_values(["flight", "unixtime"])`. If groupby_column is provided,
    the dataframe will first be grouped by this. The returned units are based on the
    units provided by the coordinate_column and time_column.

    Parameters
    ----------
    data : pd.DataFrame
        Dataframe containing the data points to calculate the ground speed for,
        must have columns 'unixtime' and 'relative_distance'
    time_column : str
        name of the column containing the time in seconds
    coordinate_column : str
        name of the column containing the coordinates to use for calculating velocity
    groupby_column : str | None, optional
        Column name to group by before calculation, by default None

    Returns
    -------
    pd.Series
        The velocity component in units of the provided coordinate and time columns
    """
    data = data.copy()

    if groupby_column is None:
        return np.gradient(data[coordinate_column], data[time_column])

    # iterate through groups, append speeds, and concat
    groups = []
    for _segment_name, segment_data in tqdm(
        data.groupby(groupby_column), desc="Segments"
    ):
        groups.append(
            np.gradient(segment_data[coordinate_column], segment_data[time_column])
        )

    return np.concatenate(groups)


[docs] def ground_speed( data: pd.DataFrame, *, time_column: str = "unixtime", easting_column: str = "easting", northing_column: str = "northing", groupby_column: str | None = None, ) -> pd.Series: """ TODO: do calculation forward for 1st points so they aren't 0 Calculate the ground speed in meters per second. This is change in distance divided by the change in time between each successive row in the dataframe. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `data = data.sort_values(["flight", "unixtime"])`. If groupby_column is provided, the dataframe will first be grouped by this. Parameters ---------- data : pd.DataFrame Dataframe containing the data points to calculate the ground speed for, must have columns 'unixtime' and 'relative_distance'. time_column : str name of the column containing the time in seconds easting_column : str name of the column containing the easting coordinates in meters northing_column : str name of the column containing the northing coordinates in meters groupby_column : str | None, optional Column name to group by before calculation, by default None Returns ------- pd.Series The groundspeed in units of meters per second """ data = data.copy() data["cumulative_distance"] = cumulative_distance( data, easting_column=easting_column, northing_column=northing_column, groupby_column=groupby_column, ) if groupby_column is None: return np.gradient(data.cumulative_distance, data[time_column]) # iterate through groups, append speeds, and concat groups = [] for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): groups.append( np.gradient(segment_data.cumulative_distance, segment_data[time_column]) ) return np.concatenate(groups)
# def _vertical_acceleration( # time: NDArray, # height: NDArray, # ) -> NDArray: # """ # Calculate the vertical acceleration between each successive set of points # Parameters # ---------- # time : NDArray # array of the time values # height : NDArray # array of the height values # Returns # ------- # NDArray # the vertical acceleration between each set of points # """ # assert len(time) == len(height) # # shift the arrays by 1 # time_lag = np.empty_like(time) # time_lag[:1] = np.nan # time_lag[1:] = time[:-1] # height_lag = np.empty_like(height) # height_lag[:1] = np.nan # height_lag[1:] = height[:-1] # # compute vertical velocity # vertical_vel = (height - height_lag) / (time - time_lag) # # shift arrays by 1 # vertical_vel_lag = np.empty_like(vertical_vel) # vertical_vel_lag[:1] = np.nan # vertical_vel_lag[1:] = vertical_vel[:-1] # # compute vertical acceleration # return (vertical_vel - vertical_vel_lag) / (time - time_lag)
[docs] def vertical_acceleration( data: pd.DataFrame, *, time_column: str, height_column: str, groupby_column: str | None = None, time_threshold: float | None = None, smoothing_window: int | None = None, ) -> pd.Series: """ Calculate the 2nd derivative of height change with respect to time for each line. If there is a gap between points greater than time_threshold in seconds, the line will be split at the gap and the acceleration will be NaN for the points on either side of the gap. Parameters ---------- data : pd.DataFrame Dataframe containing the data points and must have columns set from time_column and height_column. time_column : str Column name to containing the time in seconds height_column : str Column name to containing the flight height in meters groupby_column : str | None, optional Column name to group by before calculation, by default None time_threshold : float Threshold in seconds for determining gaps in the data, where acceleration will be set to NaN smoothing_window : int, optional Window size in number of data points for smoothing each derivative after it's been calculated, by default None Returns ------- pd.Series Series containing the vertical acceleration in m/s^2 for each point in the dataframe. """ data = data.copy() col_list = [time_column, height_column] if groupby_column is not None: col_list.append(groupby_column) assert all(x in data.columns for x in col_list), ( f"dataframe must contain columns {col_list} " ) if (groupby_column is None) and (time_threshold is not None): # split data into segments where there is a gap in time greater than # time_threshold # Calculate time difference between each point data["tmp_time_diff"] = pd.to_timedelta(data[time_column].diff(), unit="s") # Create new subline when gap > time_threshold data["tmp_new_group"] = ( data["tmp_time_diff"] > pd.Timedelta(seconds=time_threshold) ).astype(int) # Cumulative sum per Line to generate subline number data["tmp_segment"] = data["tmp_new_group"].cumsum() # Drop helper columns if not needed data = data.drop(columns=["tmp_time_diff", "tmp_new_group"]) groupby_column = "tmp_segment" if (groupby_column is not None) and (time_threshold is not None): # split data into segments where there is a gap in time greater than # time_threshold # Calculate time difference between each point data["tmp_time_diff"] = pd.to_timedelta( data.groupby(groupby_column)[time_column].diff(), unit="s" ) # Create new subline when gap > time_threshold data["tmp_new_group"] = ( data["tmp_time_diff"] > pd.Timedelta(seconds=time_threshold) ).astype(int) # Cumulative sum per Line to generate subline number data["tmp_segment"] = data.groupby(groupby_column)["tmp_new_group"].cumsum() # Drop helper columns if not needed data = data.drop(columns=["tmp_time_diff", "tmp_new_group"]) groupby_column = [groupby_column, "tmp_segment"] if groupby_column is None: times = data[time_column] heights = data[height_column] vertical_vel = np.gradient(heights, times) vertical_accel = pd.Series(np.gradient(vertical_vel, times)) if smoothing_window is not None: return vertical_accel.rolling(window=smoothing_window, min_periods=1).mean() return vertical_accel # iterate through groups, append accelerations, and concat accels = [] for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): times = segment_data[time_column] heights = segment_data[height_column] vertical_vel = np.gradient(heights, times) vertical_accel = pd.Series(np.gradient(vertical_vel, times)) if smoothing_window is not None: vertical_accel = vertical_accel.rolling( window=smoothing_window, min_periods=1 ).mean() accels.append(vertical_accel) return np.concatenate(accels)
def relative_track_ellipsoid( lat: NDArray, lon: NDArray, ) -> NDArray: """ Calculate the track between each successive set of points in degrees clockwise from geographic north in the range 0 to 360. This uses the WGS84 ellipsoid, make the results more accurate the ::func:`relative_track_sheroid`. Parameters ---------- lat : NDArray array of the latitude coordinate values in decimal degrees lon : NDArray array of the longitude coordinate values in decimal degrees Returns ------- NDArray the track between each set of points in decimal degrees from geographic north """ geod = Geodesic.WGS84 tracks = [] # Calculate tracks for all segments (N-1) for i in range(len(lat) - 1): line = geod.Inverse(lat[i], lon[i], lat[i + 1], lon[i + 1]) tracks.append(line["azi1"]) # Duplicate the last calculated track if the list isn't empty if tracks: tracks.append(tracks[-1]) elif len(lat) == 1: # Handle single point case (no direction possible) tracks.append(None) # make tracks in range of 0 to 360 return np.array(tracks) % 360 def relative_track_spheroid( lat: NDArray, lon: NDArray, ) -> NDArray: """ Calculate the track between each successive set of points in degrees clockwise from geographic north in the range 0 to 360. This assumes the Earth is a sphere, which is less accurate the using an ellipsoid model. Parameters ---------- lat : NDArray array of the latitude coordinate values in decimal degrees lon : NDArray array of the longitude coordinate values in decimal degrees Returns ------- NDArray the track between each set of points in decimal degrees from geographic north """ # ensure 1D lat = np.asarray(lat).ravel() lon = np.asarray(lon).ravel() assert len(lat) == len(lon) # convert degrees to radians lat, lon = np.deg2rad(lat), np.deg2rad(lon) # git difference of each successive row delta_lon = np.diff(lon) # start points (all except last) lat_start = lat[:-1] # end points (all except first) lat_end = lat[1:] y = np.sin(delta_lon) * np.cos(lat_end) x = np.cos(lat_start) * np.sin(lat_end) - np.sin(lat_start) * np.cos( lat_end ) * np.cos(delta_lon) # calculate track track_rad = np.atan2(y, x) track_deg = np.rad2deg(track_rad) # replace first values with second value track_deg = np.insert(track_deg, 0, track_deg[1]) # make tracks in range of 0 to 360 return track_deg % 360
[docs] def track( data: pd.DataFrame, *, latitude_column: str, longitude_column: str, groupby_column: str | None = None, ellipsoid: bool = True, ) -> pd.Series: """ Calculate the track between each successive row in a dataframe. Track is the angle from geographic north (positive clockwise) that and aircraft travels over the ground. This is different to the heading or bearing, which is the angle the nose of the plane points, which is affected by wind. If groupby_column is provided, the dataframe will first be grouped by this. Parameters ---------- data : pd.DataFrame Dataframe containing the data points and must have columns 'easting' and 'northing'. groupby_column : str | None, optional Column name to group by before calculation, by default None Returns ------- pd.Series The track in degrees, in the range 0 to 360, positive clockwise from geographic north """ track_func = relative_track_ellipsoid if ellipsoid else relative_track_spheroid if groupby_column is None: return track_func( lat=data[latitude_column].values, lon=data[longitude_column].values, ) # iterate through groups, append tracks, and concat tracks = [] for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): calc_track = track_func( lat=segment_data[latitude_column].values, lon=segment_data[longitude_column].values, ) tracks.append(calc_track) return np.concatenate(tracks)
def _relative_distance( x: NDArray, y: NDArray, ) -> NDArray: """ Calculate the relative distance between each successive set of points Parameters ---------- x : NDArray array of the first coordinate values y : NDArray array of the second coordinate values Returns ------- NDArray the distance between each set of points """ assert len(x) == len(y) # shift the arrays by 1 x_lag = np.empty_like(x) x_lag[:1] = np.nan x_lag[1:] = x[:-1] y_lag = np.empty_like(y) y_lag[:1] = np.nan y_lag[1:] = y[:-1] # compute distance between each set of coordinates rel_dist = np.sqrt((x - x_lag) ** 2 + (y - y_lag) ** 2) # set first row distance to 0 rel_dist[0] = 0 return rel_dist
[docs] def relative_distance( data: pd.DataFrame, *, easting_column: str = "easting", northing_column: str = "northing", groupby_column: str | None = None, ) -> pd.Series: """ Calculate distance between successive points in a dataframe. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you must first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `data = data.sort_values(["flight", "unixtime"])`. If groupby_column is provided, the dataframe will first be grouped by this. Parameters ---------- data : pandas.DataFrame Dataframe containing the data. easting_column : str name of the column containing the easting coordinates in meters northing_column : str name of the column containing the northing coordinates in meters groupby_column : str | None, optional Column name to group by before calculation, by default None Returns ------- pandas.Series Returns a pandas Series of the relative distances which can be assigned to a new column. """ col_list = [easting_column, northing_column] if groupby_column is not None: col_list.append(groupby_column) assert all(x in data.columns for x in col_list), ( f"dataframe must contain columns {col_list} " ) if groupby_column is None: return _relative_distance( data[easting_column].values, data[northing_column].values ) # iterate through groups, append distances, and concat dists = [] for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): distances = _relative_distance( segment_data[easting_column].values, segment_data[northing_column].values ) dists.append(distances) return np.concatenate(dists)
def cumulative_distance( data: pd.DataFrame, *, easting_column: str = "easting", northing_column: str = "northing", groupby_column: str | None = None, ) -> pd.Series: """ Calculate the cumulative distance along track in a dataframe. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `data = data.sort_values(["flight", "unixtime"])`. If groupby_column is provided, the dataframe will first be grouped by this. Parameters ---------- data : pandas.DataFrame Dataframe containing the data. easting_column : str name of the column containing the easting coordinates in meters northing_column : str name of the column containing the northing coordinates in meters groupby_column : str | None, optional Column name to group by before calculation, by default None Returns ------- pandas.Series Returns a pandas Series of the relative distances which can be assigned to a new column. """ return relative_distance( data, easting_column=easting_column, northing_column=northing_column, groupby_column=groupby_column, ).cumsum()
[docs] def along_track_distance( data: pd.DataFrame, *, easting_column: str = "easting", northing_column: str = "northing", groupby_column: str | None = None, guess_start_position: bool = False, ) -> pd.Series: """ Calculate the distances along track in meters. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `data = data.sort_values(["flight", "unixtime"])`. If groupby_column is provided, the dataframe will first be grouped by this. Parameters ---------- data : pd.DataFrame Dataframe containing the data points to calculate the distance along each line, must have a set geometry column. easting_column : str name of the column containing the easting coordinates in meters northing_column : str name of the column containing the northing coordinates in meters groupby_column : str | None, optional Column name to group by before calculation, by default None guess_start_position: bool, optional If True, this will determine the start of the line, not by the first row, but by finding the leftmost corner of the line. This is useful if you don't have a time column and your data is not sorted by time. Returns ------- pd.Series The along track distance in meters """ if guess_start_position: assert isinstance(data, gpd.GeoDataFrame), ( "if `guess_start_position` is True, `data` must be a geopandas geodataframe." ) if groupby_column is None: # turn point data into line line = gpd.GeoSeries(LineString(data.geometry.tolist())) # find minimum rotated rectangle around line rect = line.iloc[0].minimum_rotated_rectangle # get angle of rotation angle = azimuth(rect) if 90 < angle <= 180: angle = angle - 180 # rotate the line to be horizontal line_horizontal = line.rotate(angle, origin=shapely.centroid(rect)) horizontal_df = line_horizontal.get_coordinates( index_parts=True, ignore_index=True, ) horizontal_df["original_index"] = data.index horizontal_df = horizontal_df.sort_values("x").reset_index(drop=True) horizontal_df["tmp"] = cumulative_distance( horizontal_df, easting_column="x", northing_column="y", groupby_column=None, ) horizontal_df = horizontal_df.sort_values("original_index").set_index( "original_index" ) return horizontal_df.tmp data = data.copy() for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): # turn point data into line line = gpd.GeoSeries(LineString(segment_data.geometry.tolist())) # find minimum rotated rectangle around line rect = line.iloc[0].minimum_rotated_rectangle # get angle of rotation angle = azimuth(rect) if 90 < angle <= 180: angle = angle - 180 # rotate the line to be horizontal line_horizontal = line.rotate(angle, origin=shapely.centroid(rect)) horizontal_df = line_horizontal.get_coordinates( index_parts=True, ignore_index=True, ) horizontal_df["original_index"] = segment_data.index horizontal_df = horizontal_df.sort_values("x").reset_index(drop=True) horizontal_df["tmp"] = cumulative_distance( horizontal_df, easting_column="x", northing_column="y", groupby_column=None, ) horizontal_df = horizontal_df.sort_values("original_index").set_index( "original_index" ) data.loc[data[groupby_column] == _segment_name, "tmp"] = horizontal_df.tmp return data.tmp if groupby_column is None: return cumulative_distance( data, easting_column=easting_column, northing_column=northing_column, groupby_column=None, ) # iterate through groups, append distances, and concat groups = [] for _segment_name, segment_data in data.groupby(groupby_column): groups.append( cumulative_distance( segment_data, easting_column=easting_column, northing_column=northing_column, groupby_column=None, ) ) return np.concatenate(groups)
def _azimuth_between_points( point1: tuple[float, float], point2: tuple[float, float], ) -> float: """azimuth between 2 points (interval 0 - 180)""" angle = np.arctan2(point2[1] - point1[1], point2[0] - point1[0]) return np.degrees(angle) if angle > 0 else np.degrees(angle) + 180 # type: ignore[no-any-return] def _dist(a: tuple[float, float], b: tuple[float, float]) -> float: """distance between points""" return math.hypot(b[0] - a[0], b[1] - a[1]) def azimuth(mrr) -> float: # type: ignore[no-untyped-def] """azimuth of minimum_rotated_rectangle""" bbox = list(mrr.exterior.coords) axis1 = _dist(bbox[0], bbox[3]) axis2 = _dist(bbox[0], bbox[1]) if axis1 <= axis2: az = _azimuth_between_points(bbox[0], bbox[1]) else: az = _azimuth_between_points(bbox[0], bbox[3]) return az