Source code for wltp.idgears

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2014-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
*(OUTDATED, irrelevant)* Detects vehile's gear-ratios from cycle-data and reconstructs the gears-profile by identifying the actual gears used.

.. NOTE::
    NOT RELLEVANT to generation of wltp cyles.

The following terms and arguments are used throughout this module:

gear-ratio(s):
    the "NoverV", which is the inverse of the "engine-speed-to-velocity" ("STV").
    Conceptually it is the number of engine revolutions required, in RPM,
    in order for the the vehicle to travel with 1 km/h.

    .. Note: The inverse of STVs ("VoverN") are used throughout the calculations because
            they are practically between (0, 1), and more equally spaced.

cycle-ratios:
    The actual or detected NoverV-ratio on each cycle-point.

cycle-gears:
    The actual or identified gear used on each cycle-point.

cycle-data
    a :class:`pd.DataFrame` containing (at the begining) `N` and `V` columns
    with [rpm] and [km/h] as units respectively.

The detection of the gear-ratios happens in a 3 steps, approximation or guessing, estimation and selection:

1. Produce a number of different *approximate* sets of gear-ratios (the "guesses")
   by producing histograms of the ratios with different combinations of bins, and then
   taking the first #gears of peaks.
2. Feed these "guessed" sets of gear-ratios into a robust (with median) kmeans clustering algorithm, and
3. pick the gear-ratios with the minimum distortion.

The identification of the actual gear on each cycle-data point (reconstructing the gear-profile)
happens in 2 steps:

1. On each cycle-point select the Gear with smallest absolute-difference with the respective identified ratio;
2. Reconstruct engine-speed by dividing velocity-profile with identified gear-ratio on each cycle-point.

"""
## TODO: Rename columns N, V.
## TODO: Classify code.

from collections import namedtuple
import functools as ft
import math
import numpy as np, pandas as pd
from scipy import signal


[docs]def dequantize_unstabbleMean(df, method="linear"): """ :param str method: see :meth:`pd.DataFrame.interpolate()` that is used to fill-in column-values that are held constant """ edges = df.diff() df_1 = df.where(edges != 0) ## Points after change. df_0 = df.where((edges.shift(-1) != 0)) ## Points before change df_21 = 2 * (df_0.shift(1) + df_1) / 3 ## 2/3 * sum located after change df_12 = df_21.shift(-1) / 2 ## 1/3 * sum located before change df2 = df_21 + df_12 df2.iloc[[0, -1], :] = df.iloc[[0, -1], :] ## Pin start-end values to originals. df2.interpolate(method=method, inplace=True) return df2
[docs]def dequantize(df, method="linear"): """ Tries to undo results of a non-constant sampling rate (ie due to indeterministic buffering). :param str method: see :meth:`pd.DataFrame.interpolate()` that is used to fill-in column-values that are held constant .. Note:: The results wil never have a flat section, so checking for some value (ie 0) must always apply for some margin (ie (-e-4, e-4). """ df1 = df.where(df.diff() != 0) df1.iloc[[0, -1]] = df.iloc[[0, -1]] ## Pin start-end values to originals. df1.interpolate(method=method, inplace=True) return df1
def identify_n_idle(N): ## Estimate n_idel as the peak-N on the lower-half. # N = N[N < N.median()] peaks, _, _ = find_histogram_peaks(N, int(math.sqrt(len(N)) + 1)) n_idle_0 = peaks.iloc[0, -1] # print(n_idle_0) ## Rebin-smoothly around 1-stdev of estimated n_idle, # to increase accuracy. # jog = N.std() / 2 N = N[(N > (n_idle_0 - jog)) & (N < (n_idle_0 + jog))] peaks, _, _ = find_histogram_peaks(N, int(math.sqrt(len(N)) + 1), 3) return peaks.iloc[0, -1] def _outliers_filter_df2(df, cols): for col in cols: mean = df[col].median() # std = np.subtract(*np.percentile(df[col], [75, 25])) std = df[col].mad() df = df[(df[col] >= mean - 2 * std) & (df[col] <= mean + 2 * std)] return df def _outliers_filter_df(df, cols): def filter_col(col): mean = df[col].median() # std = np.subtract(*np.percentile(df[col], [75, 25])) std = df[col].mad() return np.abs(df[col] - mean) <= 2 * std return ft.reduce((filter_col(col) for col in cols), np.bitwise_and) def _transient_filter_df(df, col, std_threshold=0.2, apply_dequantize=True): if apply_dequantize: df[col] = dequantize(df[col]) grad = np.gradient(df[col]) grad_mean, grad_std = grad.mean(), grad.std() thres = (grad_mean - std_threshold * grad_std, grad_mean + std_threshold * grad_std) df1 = df.ix[(thres[0] < grad) & (grad < thres[1]), :] return df1 def _n_idle_filter_df(df, col, n_idle_threshold=1.05): n_idle = identify_n_idle(df[col]) df1 = df[df[col] > n_idle_threshold * n_idle] return df1 def _filter_cycle_and_derive_ratios(df, filter_outliers=None, filter_transient=True): df["VoverN"] = ( df.V / df.N ) # Work with VoverN because evenly spaced (not log), and 0 < VoverN < 1 df0 = df[(df.V > 2) & (df.VoverN > 0)] df1 = _n_idle_filter_df(df0, "N") df2 = _transient_filter_df(df1, "VoverN") # valids = _outliers_filter_df(df2, ['VoverN']) if filter_outliers else df2 # PROBLEMATIC!! # df3 = df2[valids] return df2
[docs]def moving_average(x, window): """ :param int window: odd windows preserve phase From http://nbviewer.ipython.org/github/demotu/BMC/blob/master/notebooks/DataFiltering.ipynb """ return np.convolve(x, np.ones(window) / window, "same")
def find_histogram_peaks(df, bins, smooth_window=None): h_points, bin_edges = np.histogram(df, bins=bins, density=True) h_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 if smooth_window: h_points = moving_average(h_points, smooth_window) peak_bins = signal.argrelmax(h_points, mode="clip") peak_bins = peak_bins[0] peaks = pd.DataFrame( { "bin": peak_bins, "value": h_centers[peak_bins], "population": h_points[peak_bins], } ) peaks = peaks.sort_values(["population"], ascending=False) return peaks, h_centers, h_points
[docs]def _approximate_ratios_by_binning(ratios, bins, ngears): """ :return: 3 sets of ratios: 1. all peaks detected with bins specified 2. the first #gears peaks 3. the first #gears peaks detected by binning the space between the previous #gears identified """ peaks0, _, _ = find_histogram_peaks(ratios, bins) # display(peaks0) peaks1 = peaks0[:ngears].sort( ["value"], ascending=True ) ## Keep #<gears> of most-populated bins. # display(peaks1) ## Re-bin with narrower X-to-bin. # r_min = peaks1["value"].min() r_max = peaks1["value"].max() gear_distance = (r_max - r_min) / ngears r_min -= gear_distance r_max += gear_distance peaks2, h_centers, h_points = find_histogram_peaks( ratios[(ratios >= r_min) & (ratios <= r_max)], bins ) # display(peaks2) peaks3 = peaks2[:ngears].sort( ["value"], ascending=True ) ## Keep #<gears> of most-populated bins. # display(peaks3) return peaks0, peaks1, peaks3, h_points, h_centers
[docs]def _norm1_1d_vq(obs, guess, is_robust=False): """Simplified from scipy.cluster.vq.py_vq2()""" dist = np.abs(obs[np.newaxis, :] - guess[:, np.newaxis]) if is_robust: ## Does not make a difference! robust_prcntile = ( 4.685 ) ## Bisquare M-estimator with 95% efficiency under the Gaussian model. R_deleved = dist / ( robust_prcntile * 1.4826 * np.median(dist) ) ## * sqrt(1 - hat_vector)) ## Calc the robust bisquared-residuals excluding outliers. R_weights = (R_deleved < 1) * (1 - R_deleved ** 2) ** 2 dist = R_weights * dist code = np.argmin(dist, 0) min_dist = np.minimum.reduce(dist, 0) return code, min_dist
[docs]def _estimate_ratios_by_kmeans(obs, guess, **kmeans_kws): """ Adapted kmeans to use norm1 distances (identical to euclidean-norm2 for 1D) and using median() for each guess, to ignore outliers and identify "gear-lines". """ def _1d_kmeans(obs, guess, guess_func, thresh=1e-5): """ The "raw" version of k-means, adapted from scipy.cluster.vq._kmeans(). """ code_book = np.array(guess, copy=True) avg_dist = [] diff = thresh + 1.0 while diff > thresh: nc = code_book.size # compute membership and distances between obs and code_book obs_code, distort = _norm1_1d_vq(obs, code_book) avg_dist.append(np.mean(distort)) # recalc code_book as centroids of associated obs if diff > thresh: has_members = [] for i in np.arange(nc): cell_members = obs[obs_code == i] if cell_members.size > 0: code_book[i] = guess_func(cell_members) has_members.append(i) # remove code_books that didn't have any members code_book = code_book[has_members] if len(avg_dist) > 1: diff = avg_dist[-2] - avg_dist[-1] # print avg_dist return code_book, avg_dist[-1] centers, distortion = _1d_kmeans(obs, guess, guess_func=np.median, **kmeans_kws) centers = np.sort(centers) return centers, distortion
## The artifacts of each detection attempt. # Detekt = namedtuple( "Detekt", ["distort", "guess", "final", "all_peaks_df", "hist_X", "hist_Y"] ## ) def _new_Detekt_approximation(guess_ratios, all_peaks_df, hist_X, hist_Y): return Detekt(None, guess_ratios, None, all_peaks_df, hist_X, hist_Y) def _set_Detekt_final(gears, final_ratios, distortion): return gears._replace(distort=distortion, final=final_ratios)
[docs]def _append_aproximate_Detekt(ngears, cases, ratios, all_peaks_df, hist_X, hist_Y): """Appends 2 sets of ratios for each one specified as input, the 2nd one being a linspace between the min-max.""" if ratios.size == ngears: cases.append(_new_Detekt_approximation(ratios, all_peaks_df, hist_X, hist_Y)) cases.append( _new_Detekt_approximation( np.linspace(ratios.min(), ratios.max(), ngears), all_peaks_df, hist_X, hist_Y, ) )
[docs]def _gather_guessed_Detekts(ngears, cycle_df): """Makes a number gear-ratios guesses based on histograms with different bin combinations. """ guessed_detekts = [] ## A list-of-Detekt: bins_cases = [ (ngears + 2) * 6, # Roughly 4 bins per gear plus 2 left & right. math.sqrt(cycle_df.shape[0]), ] for bins in bins_cases: all_peaks_df, peaks_df1, peaks_df2, hist_X, hist_Y = _approximate_ratios_by_binning( cycle_df.VoverN, bins=bins, ngears=ngears ) _append_aproximate_Detekt( ngears, guessed_detekts, peaks_df1.value.values, all_peaks_df, hist_X, hist_Y, ) _append_aproximate_Detekt( ngears, guessed_detekts, peaks_df2.value.values, all_peaks_df, hist_X, hist_Y, ) ## Add linspace for the complete ratio-range # using the 1st Detekt as template. guessed_detekts.append( guessed_detekts[0]._replace( guess=np.linspace(cycle_df.VoverN.min(), cycle_df.VoverN.max(), ngears) ) ) return guessed_detekts
[docs]def _gather_final_Detekts(ngears, cycle_df, guessed_gears): """ :param pd.DataFrame cycle_df: a dataframe with `N` and `V` columns with units [rpm] and [km/h] respectively (or inferred by comparing the means of these 2 columns). """ final_detekts = [] for gears in guessed_gears: ratios, distort = _estimate_ratios_by_kmeans( cycle_df.VoverN.values, gears.guess ) if ratios.size == ngears: final_detekts.append(_set_Detekt_final(gears, ratios, distort)) final_detekts = sorted(final_detekts, key=lambda x: x.distort) return final_detekts
[docs]def run_gear_ratios_detections_on_cycle_data(ngears, cycle_df): """ Invoke this one if you want to draw the results. :param pd.DataFrame cycle_df: it must contain (at least) `N` and `V` columns (units: [rpm] and [km/h] respectively) :return: a list of all :class:`Detekt` tuples sorted with the most probable one at the the head, needed. Its 1st element is the solution """ filtered_df = _filter_cycle_and_derive_ratios(cycle_df) guessed_detekts = _gather_guessed_Detekts(ngears, filtered_df) final_detekts = _gather_final_Detekts(ngears, filtered_df, guessed_detekts) return final_detekts
[docs]def detect_gear_ratios_from_cycle_data(ngears, cycle_df): """ Use a 2 step procedure if you want to plot the results, by invoking `run_gear_ratios_detections_on_cycle_data()` and `plot_idgears_results()` separately. :return: a :class:`ndarray` with the detected gear-ratios (for the STVs, inverse them) """ detekts = run_gear_ratios_detections_on_cycle_data(ngears, cycle_df) if not detekts[0].final is None: return detekts[0].final, detekts[0].distort else: raise Exception( "Detection failed to estimate any gear-ratios!\n All-Detekts(%s)" % detekts )
[docs]def identify_gears_from_cycle_data(cycle_df, gear_ratios): """ Like :meth:`identify_gears_from_ratios()` but with more sane filtering (ie v above 1 km/h). :param pd.DataFrame cycle_df: it must contain (at least) `N` and `V` columns (units: [rpm] and [km/h] respectively) :param ndarray gear_ratios: the same as :meth:`identify_gears_from_ratios()` :return: the same as :meth:`identify_gears_from_ratios()` """ df = _filter_cycle_and_derive_ratios(cycle_df) return identify_gears_from_ratios(df.VoverN, gear_ratios)
[docs]def identify_gears_from_ratios(cycle_ratios, gear_ratios): """ Return arrays will miss NaNs! :param ndarray cycle_ratios: a single column array/list/ndarray of ratios (STVs or inverse). :param ndarray gear_ratios: this list/ndarray of gear-ratios with len equal to the #gears :return: a 2-tuple, where [0] is the 0-based identified-gears, and [1] the distortion for each cycle-point. """ cycle_ratios = np.asarray(cycle_ratios).flatten() nums = np.isfinite(cycle_ratios) gears, distort = _norm1_1d_vq(cycle_ratios[nums], np.asarray(gear_ratios).flatten()) return gears, distort
[docs]def reconstruct_engine_speed(cycle_df, gear_ratios): """ Adds :param ndarray gear_ratios: this list/ndarray of gear-ratios with len equal to the #gears TODO: Class-method """ cycle_df["GEARS_detekted"] = ( identify_gears_from_ratios(cycle_df["VoverN"], gear_ratios) + 1 ) cycle_df["VoverN_reconstructed"] = cycle_df["V"] / cycle_df["VoverN"] cycle_df["N_reconstructed"] = cycle_df["V"] / cycle_df["VoverN"]
[docs]def plot_idgears_results(cycle_df, detekt, fig=None, axes=None, original_points=None): """ :param detekt: A Detekt-namedtuple with the data to plot """ from matplotlib import pyplot as plt if not fig: fig = plt.figure(figsize=(18, 10)) fig.suptitle( "Detekt: %s, Accuracy: %s (wltp-good: ~< 1e-3)" % (detekt.final, detekt.distort) ) if axes: ax1, ax2, ax3 = axes else: ax1 = plt.subplot(2, 2, 1) ax2 = plt.subplot(2, 2, 2) ax3 = plt.subplot(2, 1, 2) ax31 = ax3.twinx() ax32 = ax31.twinx() peaks_stv = 1 / detekt.guess kmeans_stv = 1 / detekt.final ## Plot engine-points ## ax1.plot(cycle_df.V, cycle_df.N, "g+", markersize=5) if not original_points is None: ax1.plot(original_points.V, original_points.N, "k.", markersize=1) for r in peaks_stv: ax1.plot([0, cycle_df.V.max()], [0, cycle_df.V.max() * r], "c-", alpha=0.5) for r in kmeans_stv: ax1.plot([0, cycle_df.V.max()], [0, cycle_df.V.max() * r], "r-", alpha=0.5) ax1.set_ylim(0, cycle_df.N.median() + 2 * cycle_df.N.mad()) ax1.set_xlim( cycle_df.V.median() - 2 * cycle_df.V.mad(), cycle_df.V.median() + 2 * cycle_df.V.mad(), ) ## Plot ratios's Histogram # ax2.plot(detekt.hist_Y, detekt.hist_X) ax2.plot( detekt.all_peaks_df.value, detekt.all_peaks_df.population, "ob", markersize=8, fillstyle="none", ) # ax2.plot(peaks_df.value, peaks_df.population, 'ob', markersize=8, fillstyle='full') ## Annotate top-#detekt # plt.hlines(detekt.all_peaks_df.population.mean() - detekt.all_peaks_df.population.mad(), 0, detekt.hist_Y.max(), 'g', linestyle='-') ## Scatter-plot Ratios ## R = cycle_df.VoverN # ax31.plot(cycle_df.N, 'm-', lw=0.5, markersize=2) ax31.plot(cycle_df.V, "b-", lw=0.5, markersize=2) ax3.plot(R, "g.-", lw=0.5, markersize=2) if not original_points is None: ax3.plot(original_points.V / original_points.N, "k.", markersize=1) for r in detekt.all_peaks_df.value: plt.hlines(r, 0, R.shape[0], "c", linestyle=":") for r in detekt.final: plt.hlines(r, 0, R.shape[0], "c", linestyle="-.") for r in detekt.final: plt.hlines(r, 0, R.shape[0], "r", linestyle="--") plt.ylim(R.min(), R.max()) plt.xlim(0, R.shape[0])