#!/usr/bin/python3 import sys import os from configparser import ConfigParser from configparser import MissingSectionHeaderError import datetime import texttable import statistics import matplotlib.pyplot as plt from matplotlib import colors from matplotlib.ticker import PercentFormatter __CONTEXT = { "VERBOSE": False, "PARSE_CONFIG": False, "DATALOG_PATH": None, "OUTPUT_PATH": "./", "FILTER_BY_STDDEVS": 1, "DRAW_AVG_PLOT": 0, "DRAW_STDDEV_PLOT": False, "BOLD_LINES_ON_PLOT": False, "VALUE_MULTI": 1000000000, } def std_dev(values): avg = sum(values) / len(values) squares_sum = sum([(value - avg)**2 for value in values]) return (squares_sum / (len(values) - 1))**(0.5) def calc_avg(values): avg_plot_buf = [] avg_plot = [] for val in values: avg_plot_buf.append(val) if len(avg_plot_buf) > DRAW_AVG_PLOT: avg_plot_buf.pop(0) avg_plot.append(statistics.mean(avg_plot_buf)) return avg_plot def __calc_stats(values): return { "MIN": min(values), "MAX": max(values), "SPAN": max(values) - min(values), "MEAN": statistics.mean(values), "MEDIAN": statistics.median(values), "MODE": statistics.mode(values), "STDDEV": std_dev(values), } def calc_stats(values): twentyciles = statistics.quantiles(values, n=20) stats = { "Meta": { "Generated_from": DATALOG_PATH, "Generated_at": datetime.datetime.now().strftime("%Y.%m.%d"), "Number_of_measurements": len(values), }, "Params": __CONTEXT, "Main": __calc_stats(values), "Percentiles": { "5%": twentyciles[0], "10%": twentyciles[1], "25%": twentyciles[4], "50%": twentyciles[9], "75%": twentyciles[-5], "90%": twentyciles[-2], "95%": twentyciles[-1], } } if DRAW_AVG_PLOT: stats.update({"Average": __calc_stats(calc_avg(values))}) return stats def val_to_text(value): units = "Sec" # sec to msec if abs(value) < 1: value *= 1000 units = "mSec" # msec to usec if abs(value) < 1: value *= 1000 units = "uSec" # usec to nsec if abs(value) < 1: value *= 1000 units = "nSec" # nsec to psec if abs(value) < 1: value *= 1000 units = "pSec" return f"{value:+.3f} {units}" def text_to_val(text): if text.isdigit(): return int(text) multi_map = { "n": 1000000000, "u": 1000000, "m": 1000, "s": 1, } return multi_map[text] def init_table(): table = texttable.Texttable() table.set_deco(table.HEADER | table.VLINES | table.BORDER) table.set_chars(['-', '|', '|', '-']) table.set_cols_dtype(["t", "e", "f", "t"] if VERBOSE else ["t", "t"]) table.set_cols_width([8, 20, 20, 15] if VERBOSE else ["8", "20"]) table.set_cols_align(["r", "l", "l", "l"] if VERBOSE else ["r", "l"]) table.set_precision(12) table.header(["Name", "Value(e)", "Value(f)", "Value(t)"] if VERBOSE else ["Name", "Value"]) return table def print_stats(stats, label): print(f"\n{label}:") table = init_table() for stat, val in stats.items(): row = [stat] if VERBOSE: row += [val, val] row += [f"{val_to_text(val)}"] table.add_row(row) print(table.draw()) def do_statistics(): measurements = [] with open(DATALOG_PATH, 'r') as datalog: for line in datalog.readlines(): measurements.append(float(line.split('\n')[0])) stats = calc_stats(measurements) print_stats(stats["Main"], "Non-filtered") if FILTER_BY_STDDEVS: outliers = [value for value in measurements if abs(value) >= stats["Main"]["STDDEV"] * FILTER_BY_STDDEVS] measurements = [value for value in measurements if abs(value) < stats["Main"]["STDDEV"] * FILTER_BY_STDDEVS] print(f"\nOutliers({FILTER_BY_STDDEVS}xSTDDEV):") for outlier in outliers: print(val_to_text(outlier)) stats = calc_stats(measurements) print_stats(stats["Main"], "Filtered") print_stats(stats["Average"], "Average(filtered)") else: print_stats(stats["Average"], "Average(non-filtered)") print_stats(stats["Percentiles"], "Percentiles") return stats, measurements def do_plot(stats, measurements): # Simple plot linewidth = 1 if BOLD_LINES_ON_PLOT else 0.2 plt.figure(figsize=(15, 5)) if DRAW_STDDEV_PLOT: stddev = stats["Main"]["STDDEV"] * VALUE_MULTI mean = stats["Main"]["MEDIAN"] * VALUE_MULTI edges = [ ([mean + 1 * stddev, mean - 1 * stddev], "dotted", "grey"), ([mean + 2 * stddev, mean - 2 * stddev], "dashed", "grey"), ([mean + 3 * stddev, mean - 3 * stddev], "dashdot", "red"), ] for lines, style, color in edges: for line in lines: plt.axhline(y=line, color=color, linewidth=linewidth, linestyle=style) plt.plot(range(len(measurements)), [val * VALUE_MULTI for val in measurements], linewidth=linewidth) if DRAW_AVG_PLOT: avg_plot = [val * VALUE_MULTI for val in calc_avg(measurements)] plt.plot(range(DRAW_AVG_PLOT//2, len(avg_plot)+DRAW_AVG_PLOT//2), avg_plot, linewidth=linewidth) plt.grid(axis='both') plt.title('Generated from ' + DATALOG_PATH) plt.xlabel('Time, readings') plt.ylabel('Diff, nanoseconds') plt.savefig(os.path.join(OUTPUT_PATH, "plot.png"), dpi=300) # Simple scatter plt.figure(figsize=(15, 5)) plt.scatter(range(len(measurements)), [val * VALUE_MULTI for val in measurements], s=2) plt.grid(axis='both') plt.title('Generated from ' + DATALOG_PATH) plt.xlabel('Time, readings') plt.ylabel('Diff, nanoseconds') plt.savefig(os.path.join(OUTPUT_PATH, "scatter.png"), dpi=300) # Probability distribution fig, axs = plt.subplots(1, 1, tight_layout=True) plt.title('Generated from ' + DATALOG_PATH) plt.xlabel('Diff, nanoseconds') plt.ylabel('Proportion, %') N, bins, patches = axs.hist([val * VALUE_MULTI for val in measurements], bins=21) axs.yaxis.set_major_formatter(PercentFormatter(xmax=len(measurements))) plt.savefig(os.path.join(OUTPUT_PATH, "histogram.png"), dpi=300) def eat_param(param, type_class, i): if type_class == eat_unknown: eat_unknown(param, i) # raises exception, move it on if type_class == None: # Simple execute `name`, it is param() return 0 if type_class == bool: __CONTEXT.update({param: True}) return 0 try: val = type_class(sys.argv[i + 1]) except IndexError as err: key = sys.argv[i] raise IndexError(f"{err};\nYou trying to specify parameter with key {key} but haven't place it!") __CONTEXT.update({param: val}) return 1 def eat_unknown(name, i): raise ValueError(f"Unknown key `{name}` at position {i}") def show_help(exitcode=0): print(f"Usage: {sys.argv[0]} [-hv] -f FILE -o DIRECTORY [-F VALUE]") print("\t-h, --help\t— show this message") print("\t-f FILE \t— give a input csv-file") print("\t-p FILE \t— give a file that contains parameters(as showed in [Params] of stats.txt)") print("\t-o DIRECTORY \t— give a output directory") print("\t-F VALUE \t— give a number of STDDEVs to use in filter (`0` means do not filter, default is `1`)") print("\t-A VALUE \t— draw avg plot for given count of measurements (`0`(default) means do not draw)") print("\t-M VALUE \t— multiply measurements to make it one of: `n`(default), `u`, `m`, " + \ "`s`(means `1`),\n\t\t\t or any numeric multiplicator.") print("\t-b\t\t— bold lines on a plot") print("\t-s\t\t— draw STDDEV lines on a plot") print("\t-v\t\t— verbose (show digits in scientific and very long float)") sys.exit(exitcode) ARG_MAP = { "-v": (bool, "VERBOSE"), "-b": (bool, "BOLD_LINES_ON_PLOT"), "-s": (bool, "DRAW_STDDEV_PLOT"), "-f": (str, "DATALOG_PATH"), "-o": (str, "OUTPUT_PATH"), "-F": (int, "FILTER_BY_STDDEVS"), "-A": (int, "DRAW_AVG_PLOT"), "-M": (text_to_val, "VALUE_MULTI"), "-p": (str, "PARSE_CONFIG"), "--help": (None, show_help), "-h": (None, show_help), } def eat_args(): i = 0 while (i := i + 1) < len(sys.argv): arg = sys.argv[i] type_class, param = ARG_MAP.get(arg, (eat_unknown, arg)) i += eat_param(param, type_class, i) g = globals() g.update(__CONTEXT) def startup(): eat_args() if PARSE_CONFIG: config = ConfigParser() try: config.read(PARSE_CONFIG) except configparser.MissingSectionHeaderError as err: print(err) config_string = "[Params]\n" with open(PARSE_CONFIG, 'r') as config_file: config_string += [line for line in config_file.readlines()] config.read_string(config_string) try: for key, val in config["Params"].items(): key = key.upper() for arg in ARG_MAP.values(): if key == arg[1]: if arg[0] == bool: val = True if val == 'True' else False break val = arg[0](val) break __CONTEXT.update({key: val}) except KeyError as err: print(err) print("No [Params] section?") sys.exit(-3) g = globals() g.update(__CONTEXT) # Do it again to make CLI to prior eat_args() if not DATALOG_PATH: print("Gimme input data!") show_help(-1) if not os.path.exists(OUTPUT_PATH): os.mkdir(OUTPUT_PATH) elif not os.path.isdir(OUTPUT_PATH): print("Gimme directory as output path!") show_help(-2) def main(): startup() stats, measurements = do_statistics() with open(os.path.join(OUTPUT_PATH, "filtered.csv"), 'w') as output_csv: for value in measurements: output_csv.write(f"{value:+.12E}\n") parser = ConfigParser() parser.read_dict(stats) with open(os.path.join(OUTPUT_PATH, "stats.txt"), 'w') as output: parser.write(output) do_plot(stats, measurements) if __name__ == '__main__': main()