Source code for airbornegeo.nav

import math

import geopandas as gpd
import numpy as np
import pandas as pd
import seaborn as sns
import shapely
from geographiclib.geodesic import Geodesic
from numpy.typing import NDArray
from shapely.geometry import LineString
from tqdm.autonotebook import tqdm

sns.set_theme()


def ground_speed(
    data: pd.DataFrame | gpd.GeoDataFrame,
) -> pd.Series:
    """
    Calculate the ground speed in meters. This is change in distance divided by the
    change in time between each successive row in the dataframe. This assumes the data
    have been sorted by time, and that there are not flights which overlap in time. If
    there are, you can first sort by flight, then by time. For example, if you have
    columns 'flight' and 'unixtime', you can accomplish this with
    `df = df.sort_values(["flight", "unixtime"])`. The column 'relative_distance' can be
    created with the ::func:`relative_distance`. This assumes you have a column named
    'unixtime'.

    Parameters
    ----------
    data : pd.DataFrame | gpd.GeoDataFrame
        Dataframe containing the data points to calculate the ground speed for,
        must have columns 'unixtime' and 'relative_distance'.
    Returns
    -------
    pd.Series
        The groundspeed in units of meters per second
    """

    return data.relative_distance / data.unixtime.diff()


def relative_track_ellipsoid(
    lat: NDArray,
    lon: NDArray,
) -> NDArray:
    """
    Calculate the track between each successive set of points in degrees clockwise from
    geographic north in the range 180 to -180. This uses the WGS84 ellipsoid, make the
    results more accurate the ::func:`relative_track_sheroid`.

    Parameters
    ----------
    lat : NDArray
        array of the latitude coordinate values in decimal degrees
    lon : NDArray
        array of the longitude coordinate values in decimal degrees

    Returns
    -------
    NDArray
        the track between each set of points in decimal degrees from geographic north
    """

    geod = Geodesic.WGS84
    bearings = []
    # Calculate bearings for all segments (N-1)
    for i in range(len(lat) - 1):
        line = geod.Inverse(lat[i], lon[i], lat[i + 1], lon[i + 1])
        bearings.append(line["azi1"])

    # Duplicate the last calculated bearing if the list isn't empty
    if bearings:
        bearings.append(bearings[-1])
    elif len(lat) == 1:
        # Handle single point case (no direction possible)
        bearings.append(None)

    return np.array(bearings)


def relative_track_spheroid(
    lat: NDArray,
    lon: NDArray,
) -> NDArray:
    """
    Calculate the track between each successive set of points in degrees clockwise from
    geographic north in the range 180 to -180. This assumes the Earth is a sphere, which
    is less accurate the using an ellipsoid model.

    Parameters
    ----------
    lat : NDArray
        array of the latitude coordinate values in decimal degrees
    lon : NDArray
        array of the longitude coordinate values in decimal degrees

    Returns
    -------
    NDArray
        the track between each set of points in decimal degrees from geographic north
    """

    # ensure 1D
    lat = np.asarray(lat).ravel()
    lon = np.asarray(lon).ravel()

    assert len(lat) == len(lon)

    # convert degrees to radians
    lat, lon = np.deg2rad(lat), np.deg2rad(lon)

    # git difference of each successive row
    delta_lon = np.diff(lon)

    # start points (all except last)
    lat_start = lat[:-1]
    # end points (all except first)
    lat_end = lat[1:]

    y = np.sin(delta_lon) * np.cos(lat_end)
    x = np.cos(lat_start) * np.sin(lat_end) - np.sin(lat_start) * np.cos(
        lat_end
    ) * np.cos(delta_lon)

    # calculate track
    track_rad = np.atan2(y, x)
    track_deg = np.rad2deg(track_rad)
    return np.insert(track_deg, 0, np.nan)


def track(
    data: pd.DataFrame,
    *,
    latitude_column: str,
    longitude_column: str,
    groupby_column: str | None = None,
    ellipsoid: bool = True,
) -> pd.Series:
    """
    Calculate the track between each successive row in a dataframe. Track is the angle
    from geographic north (positive clockwise) that and aircraft travels over the
    ground. This is different to the bearing, which is the angle the nose of the plane
    points, which is affected by wind. If groupby_column is provided, the dataframe will
    first be grouped by this.

    Parameters
    ----------
    data : pd.DataFrame
        Dataframe containing the data points and must have columns 'easting' and
        'northing'.
    groupby_column : str | None, optional
        Column name to group by before sorting by time, by default None

    Returns
    -------
    pd.Series
        The track in degrees, -180 to 180, positive clockwise from geographic north
    """
    track_func = relative_track_ellipsoid if ellipsoid else relative_track_spheroid

    if groupby_column is None:
        return track_func(
            lat=data[latitude_column].values,
            lon=data[longitude_column].values,
        )

    # iterate through groups, append tracks, and concat
    tracks = []
    for _segment_name, segment_data in tqdm(
        data.groupby(groupby_column), desc="Segments"
    ):
        calc_track = track_func(
            lat=segment_data[latitude_column].values,
            lon=segment_data[longitude_column].values,
        )
        tracks.append(calc_track)
    return np.concatenate(tracks)


def _relative_distance(
    x: NDArray,
    y: NDArray,
) -> NDArray:
    """
    Calculate the relative distance between each successive set of points

    Parameters
    ----------
    x : NDArray
        array of the first coordinate values
    y : NDArray
        array of the second coordinate values

    Returns
    -------
    NDArray
        the distance between each set of points
    """
    assert len(x) == len(y)

    # shift the arrays by 1
    x_lag = np.empty_like(x)
    x_lag[:1] = np.nan
    x_lag[1:] = x[:-1]

    y_lag = np.empty_like(y)
    y_lag[:1] = np.nan
    y_lag[1:] = y[:-1]

    # compute distance between each set of coordinates
    rel_dist = np.sqrt((x - x_lag) ** 2 + (y - y_lag) ** 2)

    # set first row distance to 0
    rel_dist[0] = 0
    return rel_dist


[docs] def relative_distance( data: pd.DataFrame, ) -> pd.Series: """ Calculate distance between successive points in a dataframe. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `df = df.sort_values(["flight", "unixtime"])`. Parameters ---------- data : pandas.DataFrame Dataframe containing columns 'easting' and 'northing in meters. Returns ------- pandas.Series Returns a pandas Series of the relative distances which can be assigned to a new column. """ return _relative_distance(data.easting.values, data.northing.values)
def cumulative_distance( data: pd.DataFrame, ) -> pd.Series: """ Calculate the cumulative distance along track in a dataframe. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `df = df.sort_values(["flight", "unixtime"])`. Parameters ---------- data : pandas.DataFrame Dataframe containing columns 'easting' and 'northing in meters. Returns ------- pandas.Series Returns a pandas Series of the relative distances which can be assigned to a new column. """ return relative_distance(data).cumsum()
[docs] def along_track_distance( data: pd.DataFrame | gpd.GeoDataFrame, *, groupby_column: str | None = None, guess_start_position: bool = False, ) -> pd.Series: """ Calculate the distances along track in meters. The dataframe will be grouped by column `groupby_column`, and the distance will start from the first row of each group. For example, a group can be a survey, a flight, or an individual line. This assumes the data have been sorted by time, and that there are not flights which overlap in time. If there are, you can first sort by flight, then by time. For example, if you have columns 'flight' and 'unixtime', you can accomplish this with `df = df.sort_values(["flight", "unixtime"])`. Parameters ---------- data : pd.DataFrame | gpd.GeoDataFrame Dataframe containing the data points to calculate the distance along each line, must have a set geometry column. groupby_column : str | None, optional Column name to group by before sorting by time, by default None guess_start_position: bool, optional If True, this will determine the start of the line, not by the first row, but by finding the leftmost corner of the line. This is useful if you don't have a time column and your data is not sorted by time. Returns ------- pd.Series The along track distance in meters """ if guess_start_position: assert isinstance(data, gpd.GeoDataFrame) if groupby_column is None: # turn point data into line line = gpd.GeoSeries(LineString(data.geometry.tolist())) # find minimum rotated rectangle around line rect = line.iloc[0].minimum_rotated_rectangle # get angle of rotation angle = azimuth(rect) if 90 < angle <= 180: angle = angle - 180 # rotate the line to be horizontal line_horizontal = line.rotate(angle, origin=shapely.centroid(rect)) horizontal_df = line_horizontal.get_coordinates( index_parts=True, ignore_index=True, ) horizontal_df["original_index"] = data.index horizontal_df = horizontal_df.sort_values("x").reset_index(drop=True) horizontal_df = horizontal_df.rename( columns={"x": "easting", "y": "northing"} ) horizontal_df["tmp"] = cumulative_distance(horizontal_df) horizontal_df = horizontal_df.sort_values("original_index").set_index( "original_index" ) return horizontal_df.tmp data = data.copy() for _segment_name, segment_data in tqdm( data.groupby(groupby_column), desc="Segments" ): # turn point data into line line = gpd.GeoSeries(LineString(segment_data.geometry.tolist())) # find minimum rotated rectangle around line rect = line.iloc[0].minimum_rotated_rectangle # get angle of rotation angle = azimuth(rect) if 90 < angle <= 180: angle = angle - 180 # rotate the line to be horizontal line_horizontal = line.rotate(angle, origin=shapely.centroid(rect)) horizontal_df = line_horizontal.get_coordinates( index_parts=True, ignore_index=True, ) horizontal_df["original_index"] = segment_data.index horizontal_df = horizontal_df.sort_values("x").reset_index(drop=True) horizontal_df = horizontal_df.rename( columns={"x": "easting", "y": "northing"} ) horizontal_df["tmp"] = cumulative_distance(horizontal_df) horizontal_df = horizontal_df.sort_values("original_index").set_index( "original_index" ) data.loc[data[groupby_column] == _segment_name, "tmp"] = horizontal_df.tmp return data.tmp if groupby_column is None: return cumulative_distance(data) # iterate through groups, append distances, and concat distances = [] for _segment_name, segment_data in data.groupby(groupby_column): dist = cumulative_distance(segment_data) distances.append(dist) return np.concatenate(distances)
def eastward_velocity(lat_deg, lon_deg, time): """ Compute eastward velocity (m/s) from latitude, longitude, and time. Parameters ---------- lat_deg : array-like Latitude in degrees lon_deg : array-like Longitude in degrees time : array-like Time in seconds (difference between consecutive measurements should be ~1-10 s) Returns ------- v_east : ndarray Eastward velocity in m/s """ lat_rad = np.radians(np.asarray(lat_deg)) lon_rad = np.unwrap(np.radians(np.asarray(lon_deg))) # avoid Âą180 jumps time = np.asarray(time) dt = np.diff(time, prepend=np.nan) dt[dt == 0] = np.nan # avoid division by zero dlon = np.diff(lon_rad, prepend=np.nan) r = 6371000 # Earth radius in meters return r * np.cos(lat_rad) * dlon / dt def northward_velocity(lat_deg, lon_deg, time): """ Compute northward velocity (m/s) from latitude, longitude, and time. Parameters ---------- lat_deg : array-like Latitude in degrees lon_deg : array-like Longitude in degrees time : array-like Time in seconds (difference between consecutive measurements should be ~1-10 s) Returns ------- v_north : ndarray Northward velocity in m/s """ lat_rad = np.radians(np.asarray(lat_deg)) _lon_rad = np.unwrap(np.radians(np.asarray(lon_deg))) # avoid Âą180 jumps time = np.asarray(time) dt = np.diff(time, prepend=np.nan) dt[dt == 0] = np.nan # avoid division by zero dlat = np.diff(lat_rad, prepend=np.nan) r = 6371000 # Earth radius in meters return r * dlat / dt def _azimuth_between_points( point1: tuple[float, float], point2: tuple[float, float], ) -> float: """azimuth between 2 points (interval 0 - 180)""" angle = np.arctan2(point2[1] - point1[1], point2[0] - point1[0]) return np.degrees(angle) if angle > 0 else np.degrees(angle) + 180 # type: ignore[no-any-return] def _dist(a: tuple[float, float], b: tuple[float, float]) -> float: """distance between points""" return math.hypot(b[0] - a[0], b[1] - a[1]) def azimuth(mrr) -> float: # type: ignore[no-untyped-def] """azimuth of minimum_rotated_rectangle""" bbox = list(mrr.exterior.coords) axis1 = _dist(bbox[0], bbox[3]) axis2 = _dist(bbox[0], bbox[1]) if axis1 <= axis2: az = _azimuth_between_points(bbox[0], bbox[1]) else: az = _azimuth_between_points(bbox[0], bbox[3]) return az