ehmatthes.com

code, politics, and life

Cleaning up a messy, exploratory Python project

Aug 31, 2020

One of my current projects is in a state that many people will probably recognize: it’s a small project that grew through the exploratory phase, got really messy, but has now proven to be really useful. I started this project to find out if a simple river gauge in my community could be used to assess the immediate risk of landslides in real time. I want to share this project more widely, but it’s an embarassingly disorganized mess right now, and it’s inefficient as well. How do I clean up the project so it’s ready to share, and in a state where I can comfortably work on it further?

In the past I’d just look for the ugliest, most repetitive parts of the project and start refactoring. But I recently finished reading Serious Python, and came away from it with a much more structured approach to refactoring and optimization:

Rather than just do this on my own, I’ll write about my process as I go. You’ll get to see how messy my original code was, and how much this structured approach to optimization and refactoring helps. It’s a public project, so if you’re interested you can check out the project in its original, messy state and do all these steps yourself. Here goes.



Overview

In August 2015, an intense period of rain led to a landslide that killed three people in Sitka, Alaska. This rain event also caused numerous other landslides along the Sitka road system, and on hillsides in the surrounding area. Since that time, people have been looking for ways to assess landslide risk in real time.

A number of people have noticed a correlation between the behavior of a local stream gauge and landslide activity. Our mountains rise steeply from the ocean, and our valleys are short and steep. This means many of our watersheds are fairly small; a rain event in our local area causes a rapid response in our rivers. They rise quickly almost as soon as it starts to rain heavily, and when the rain tapers off the rivers recede quickly as well. Watching a river’s height over time during a significant rain event seems to be a proxy for measuring factors such as soil moisture content that directly lead to slides, but are more difficult to measure and monitor.

Indian River valley lies to the northeast of downtown Sitka. Our mountains rise steeply from the ocean, and our watersheds are relatively small.

To determine the usefulness of the stream gauge as a way to assess ongoing landslide risk, I analyzed the historical data from the stream gauge. I made a list of known landslide events along the Sitka road system, and then analyzed the last five years of stream data to see how strong the correlation is between stream gauge behavior and landslide activity. You can read a full writeup here. I’d like to write up this preliminary analysis as a formal research paper, and if the correlation is reliable enough, help build a public monitoring tool for the stream gauge that can be used as part of a local landslide warning system. This is part of a larger effort to monitor the direct causes of landslide activity such as soil moisture content; that work is ongoing, but there is not enough historical data to build a warning system from that data yet.

Here’s what readings from the stream gauge looked like during an intense period of rain in September of 2019 that led to a small landslide across a utility road in Sikta:

Indian River stream gauge readings during a rain event that led to a landslide. Readings in red represent a period of increased risk for landslides. The shaded region shows where the readings would need to be to have a high risk of slides.

The stream gauge seems to act as a proxy for primary landslide indicators such as soil moisture content. Basically, when the river is rising rapidly over an extended period of time, this is an indication that soil moisture content is rising. The blue line represents the height of the river over time. The red shaded region represents conditions when the risk of slides has been elevated in the past. The red region is not just based on river height; it’s based on river height, and how quickly the river is rising. In a 5-year period from 2014-2019, the river reached the red shaded region 9 times; 3 of those periods resulted in a slide. There may have been a couple more slides during these periods as well. It’s difficult to place the timing of a landslide if no one observed it happen.

It should also be noted that there is value not just in identifying periods of high risk for landslide activity, but also in identifying periods of low risk. The experience of having three community members in a small town killed in one event was traumatic enough that many people experience anxiety any time there is heavy rain; they have no way of assessing how much heavy rain is needed to cause a slide. Being able to tell people when we are in a period of low risk has almost as much value as telling people when the risk is high.

The code for this analysis was exploratory; I developed the code for the sole purpose of completing an initial analysis which could help decide whether it was worth including stream gauge data in the larger landslide risk monitoring project. That initial work has proven worthwhile. In order to move forward, the code for this analysis really should be cleaned up to support validation of the initial analysis, and to support further analysis of historical and current data. This is especially helpful in building an effective real-time monitoring tool.

Here’s an animation showing what a monitoring system based on this visualization might look like in real time for the Medvejie event from 2019:


Following along

If you want to do the optimization and refactoring yourself, you can start with the same version of the project that this post starts with. The following commands will let you start where this article starts:

$ git clone https://github.com/ehmatthes/sitka_irg_analysis.git
$ cd sitka_irg_analysis
$ git checkout f121c78
$ python3 -m venv irg_env
$ source irg_env/bin/activate
(irg_env)$ pip install -r requirements.txt

I’m making a branch for this post called cleaning_messy_project. The work on this branch should mostly match what’s covered in this post. I will refer to specific commits throughout the post, if you want to try one aspect of optimization without running through all of the changes yourself.

Focus

The focus for this post will be to clean up the file process_hx_data.py, and some of the code that this file calls. There will be plenty more work to do on this project, but we’ll get a lot out of cleaning up this one file.

Documentation: What does this file do?

To get started, let’s make sure the docstring at the top of process_hx_data.py is current and accurate. The hypothesis for this project is that we can assess landslide risk if we can identify critical values for the total rise of the river, with a sustained critical rate of rise. This file processes historical data from the stream gauge and creates a series of visualizations that show what the stream gauge data looked like during known slide events. It also identifies periods that met or exceeded the critical values we’ve chosen. We also want to be clear about what the purpose of this analysis is.

Here’s a cleaner version of the docstring that reflects this purpose:

"""Process all historical IR gauge data.

This file loads historical data from the Indian River stream gauge. It loads
  values for critical rate of rise, and critical total rise. It then runs
  through all the stream gauge data. It flags any data point at which the 
  stream gauge has met or exceeded these critical values, and captures a
  48-hour period of readings around that point. When these readings are
  plotted, any known slides that occurred during this period are plotted as
  well.

  The script then processes known slide events. If a slide event is not
  already associated with a critical period, 48 hours of readings around that
  slide event are grabbed, and these are plotted as well.

We want to be able to use the output to answer the following questions. In
  this context, 'notification time' refers to the time between the first
  critical point identified in a critical period, and the time at which a
  slide occurred. This is the amount of time people would have to respond
  if a notification were issued the moment the first critical point was
  identified.

  A true positive is a critical point associated with a slide,
  a false positive is a critical point with no associated slide. A false 
  negative is a slide with no associated critical point.

  - How many slides were preceded by a critical point?
  - What do the notification times look like?
  - How many true positives, false positives, and false negatives were there?
  - How many notifications would have been issued over a 5-year period?
  - Was there anything special about slides that were missed? (false negatives)
  - Was there anything special about critical points with no slides? (false
      positives)
"""

Running process_hx_data.py

Now let’s run process_hx_data.py so we can see if we’re ready for testing and benchmarking.

Here’s the output from running this file:

Reading historical data from ir_data_clean/irva_utc_072014-022016_hx_format.txt.
  First reading: 07/14/2014 23:00:00 - 21.21
  Found 12912 readings.
...
Slides outside range: 2
  Beaver Lake Slide 11/2011 (wind and snowmelt?)
  Redoubt Slide 5/2013 (not on Sitka road system)

The file runs with no apparent issues. Before we write any tests, let’s ignore the html and png plot files that are generated each time we run this file.

Ignoring plot files

I originally committed the output files, because I wanted non-programming collaborators to be able to see them without having to run the code. This was a good idea initially, but now it’s cluttering up my git status calls during development. Interactive plots generated by Plotly are basically giant JavaScript files, and these files contain hashes that are different every time the plot is generated. So even though the plots themselves haven’t changed, they’ll look different to Git every time they’re regenerated.

To clean up git status reports and commits, I’ll ignore current_ir_plots/ from now on. I’ll also add a new directory current_ir_plots_committed/. I’ll copy over the current output files and commit and push those, but then I’ll only update those when we’ve changed the analysis or the visualizations. We won’t have to deal with re-committing these files every time we make a simple optimization.

If you’re not familiar with the process of ignoring files that have already been committed, follow these steps:

$ python process_hx_data.py
$ git status
nothing to commit, working tree clean

Now that we’ve got a baseline established and we have cleaner commits, let’s write some tests.

Critical tests

I haven’t written any tests for this project yet, which was a reasonable approach up until this point. Things were changing rapidly, and this is the first time I’ve had output worth testing. These are the kinds of output that are generated:

The html files are generated by Plotly, and since these files differ slightly every time they’re generated, we won’t test them at the moment. We’ll assume that if the static visualizations are correct, then the html files are correct as well.

Writing output to the tests/ directory

Currently, output is written to current_ir_plots/ and other_output. Let’s create a tests/ directory, and modify the code that generates output so we can choose where to write to. We’ll write to the current directories normally, but allow writing to directories within tests/ as well.

To do this, we’ll restructure the overall file to be a function, and define an argument for the output directory.

In process_hx_data.py, replace this:

if __name__ == '__main__':
    """Run this file directly to generate a new set of plots for the entire
    historical period for the data included in ir_data_clean/.
    """

with this:

def process_hx_data(root_output_directory=''):
    """Process all historical data in ir_data_clean/."""

At the very end of the file, we’ll call this function:

if __name__ == '__main__':
    process_hx_data()

Readings are dumped to pkl files for more efficient processing by other scripts. The line that defines dump_filename needs to include root_output_directory:

dump_filename = f'{root_output_directory}other_output/reading_dump_{dt_last_reading_str}.pkl'

I tend to stick to 79 characters per line, but at this phase of a project I don’t always break up lines with long file paths or long strings.

The four calls to plot_data() and plot_data_static() also need to include root_output_directory:

            ...
            # Plot data, critical points, and slide event.
            ph.plot_data(reading_set, critical_points, known_slides,
                    root_output_directory=root_output_directory)
            ph.plot_data_static(reading_set, critical_points,
                    known_slides=known_slides,
                    root_output_directory=root_output_directory)
            plots_generated += 1

        # Any slides left in slides_in_range are unassociated.
        #   We can grab a 48-hr data set around this slide.
        for slide in slides_in_range:
            # Get first reading after this slide, and base 48 hrs around that.
            for reading in all_readings:
                if reading.dt_reading > slide.dt_slide:
                    slide_readings = a_utils.get_48hr_readings(
                            reading, all_readings)
                    break

            print(f"\nPlotting data for unassociated slide: {slide.name}")
            ph.plot_data(slide_readings, known_slides=known_slides,
                    root_output_directory=root_output_directory)
            ph.plot_data_static(slide_readings, known_slides=known_slides,
                    root_output_directory=root_output_directory)

            unassociated_slides.append(slide)
        ...

In plot_heights.py, the function defintions need to accept the root_output_directory argument, and the filename assignments need to include the value as well:

def plot_data(readings, critical_points=[], known_slides=[],
        root_output_directory=''):

    ...
    filename = f"{root_output_directory}current_ir_plots/ir_plot_{readings[-1].dt_reading.__str__()[:10]}.html"
def plot_data_static(readings, critical_points=[], known_slides=[],
        filename=None, root_output_directory=''):

    ...
    if not filename:
        filename = f"{root_output_directory}current_ir_plots/ir_plot_{readings[-1].dt_reading.__str__()[:10]}.png"

The default behavior of process_hx_data.py hasn’t changed, but now we can write tests that won’t overwrite existing output.

Testing png output

The png files are plots of readings and slide events; let’s write a test to verify that these files are correct. For example, we’ll test that the plot for the Medvejie slide event in Septermber of 2019 that was shown earlier in the introduction exists, and that the plot matches what we currently see.

We’ll write a test that calls process_hx_data(), and here’s what we’ll test for:

Let’s make a place to store tests, test output, and reference files, and create a testing file:

$ mkdir tests
$ mkdir tests/current_ir_plots
$ mkdir tests/other_output
$ mkdir tests/reference_files
$ touch tests/test_process_hx_data.py

Let’s ignore the output generated by the tests in .gitignore:

...
tests/current_ir_plots/
tests/other_output/

I also hadn’t set up the project properly. Let’s add __init__.py files to make the main project a module, and the tests directory a module:

$ touch __init__.py
$ touch tests/__init__.py

Install pytest (from the active virtual envrionment), and update requirements.txt:

$ pip install pytest
$ pip freeze > requirements.txt

We’ll write a stub test in test_process_hx_data.py to make sure our testing setup works:

"""Tests for process_hx_data.py.

Focuses on verifying png and pkl output.

Run this from project root directory:
$ python -m pytest
"""

from sitka_irg_analysis import process_hx_data as phd

def test_png_output():
    pass

Now run the tests, from the root directory. We’ll disable warnings for a moment, so we can write some passing tests before changing any code:

$ python -m pytest --disable-warnings
============================= test session starts ==============================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/eric/sitka_irg_analysis
collected 1 item                                                               

tests/test_process_hx_data.py .                                          [100%]

========================= 1 passed, 1 warning in 0.64s =========================

Okay, we have a passing test because we didn’t make any assertions. Now let’s call process_hx_data(), and make sure we generate the correct files. We’ll call the function from a fixture, so it’s run once and then a number of tests can work with the results of the function. If you haven’t run into fixtures before, they’re used to reduce repetition when the output of a function needs to be fed into multiple tests.

"""Tests for process_hx_data.py.

Focuses on verifying png and pkl output.

Run this from project root directory:
$ python -m pytest
"""

from os import listdir, path
from pathlib import Path

import pytest

from sitka_irg_analysis import process_hx_data as phd


@pytest.fixture
def run_process_hx_data():
    phd.process_hx_data(root_output_directory='tests/')

def test_png_output_files_exist(run_process_hx_data):
    png_file_path = 'tests/current_ir_plots'
    current_ir_plots_files = [f for f in listdir(png_file_path)
            if path.isfile(path.join(png_file_path, f))]

    png_output_files = [f for f in current_ir_plots_files
            if Path(f).suffix=='.png']

    ref_file_path = 'tests/reference_files'
    reference_files = [f for f in listdir(ref_file_path)
            if path.isfile(path.join(ref_file_path, f))]
    png_ref_files = [f for f in reference_files
            if Path(f).suffix=='.png']

    # Assert output png file names match reference png file names.
    assert(set(png_output_files) == set(png_ref_files))

The function run_process_hx_data() is decorated by @pytest.fixture, which means pytest will handle running this function efficiently regardless of how many tests need to use this function. To run a fixture before running a test, pass the name of the fixture as an argument to the test function. The function test_png_files_exist() finds all the files in tests/current_ir_plots/, and filters the results to get just the filenames for the png files. It then finds the png files in tests/reference_files/. Finally, it asserts that the set of png output file names matches the set of png reference file names.

Before running this test, copy the png files from current_ir_plots/ to tests/reference_files/current_ir_plots/.

This test passes, but it’s slow because it opens all the html files as they’re created. We’ll address that after we write a couple more tests.

Before we test the content of the png files, let’s refactor this code so it’s more useful for tests we know we’re going to have to write. Rather than doing all of the file processing work in test_png_output_files_exist(), we’ll move most of that work to utility functions that can be called by other test functions. We’ll also remove any previous test output before running process_hx_data():

"""Tests for process_hx_data.py.

Focuses on verifying png and pkl output.

Run this from project root directory:
$ python -m pytest
"""

from os import listdir, path
from pathlib import Path
import os, shutil

import pytest

from sitka_irg_analysis import process_hx_data as phd


@pytest.fixture(scope="session")
def run_process_hx_data():
    # Clear any previous output.
    shutil.rmtree('tests/current_ir_plots')
    shutil.rmtree('tests/other_output')
    os.mkdir('tests/current_ir_plots')
    os.mkdir('tests/other_output')

    phd.process_hx_data(root_output_directory='tests/')
    

def get_current_ir_plots_files():
    current_ir_plots_file_path = 'tests/current_ir_plots'
    return [f for f in listdir(current_ir_plots_file_path)
            if path.isfile(path.join(current_ir_plots_file_path, f))]

def get_png_output_files():
    return [f for f in get_current_ir_plots_files()
            if Path(f).suffix=='.png']

def get_reference_files():
    ref_file_path = 'tests/reference_files'
    return [f for f in listdir(ref_file_path)
            if path.isfile(path.join(ref_file_path, f))]

def get_png_reference_files():
    return [f for f in get_reference_files() if Path(f).suffix=='.png']


def test_png_output_files_exist(run_process_hx_data):
    # Assert output png file names match reference png file names.
    png_output_files = get_png_output_files()
    png_ref_files = get_png_reference_files()
    assert(set(png_output_files) == set(png_ref_files))

This is cleaner, and it will be much easier to work with as we write more tests.

Testing the content of png files

Now we can test the contents of the png files.

...
from os import listdir, path
from pathlib import Path
import os, shutil, filecmp

import pytest
...

def test_png_output_files_contents(run_process_hx_data):
    # Assert content of png files match reference files.
    png_output_files = get_png_output_files()
    png_ref_files = get_png_reference_files()
    
    for output_file, ref_file in zip(png_output_files, png_ref_files):
        output_file = f"tests/current_ir_plots/{output_file}"
        ref_file = f"tests/reference_files/{ref_file}"
        assert(filecmp.cmp(output_file, ref_file, shallow=False))

We use the filecmp.cmp() function to compare the contents of the png files generated by process_hx_data() function to the contents of the reference png files. The shallow=False argument makes sure we’re actually comparing the contents of the files, not just the metadata associated with the files.

Testing the content of pkl files

When process_hx_data() runs, it saves interesting sets of readings in a series of .pkl files. Let’s make sure those files are correct as well. We’ll add functions to get the .pkl output and reference files, and then tests to make sure these files exist and that their contents are correct. Note that we also need to copy the current pkl files from other_output/ to tests/reference_files/.

...
def get_png_reference_files():
    return [f for f in get_reference_files() if Path(f).suffix=='.png']

def get_pkl_output_files():
    pkl_file_path = 'tests/other_output'
    return [f for f in listdir(pkl_file_path)
            if path.isfile(path.join(pkl_file_path, f))]

def get_pkl_reference_files():
    return [f for f in get_reference_files() if Path(f).suffix=='.pkl']
...

def test_png_output_files_contents(run_process_hx_data):
    ...

def test_pkl_output_files_exist(run_process_hx_data):
    # Assert output pkl file names match reference png file names.
    pkl_output_files = get_pkl_output_files()
    pkl_ref_files = get_pkl_reference_files()
    assert(set(pkl_output_files) == set(pkl_ref_files))

def test_pkl_output_files_contents(run_process_hx_data):
    # Assert content of pkl files match reference files.
    pkl_output_files = get_pkl_output_files()
    pkl_ref_files = get_pkl_reference_files()

    for output_file, ref_file in zip(pkl_output_files, pkl_ref_files):
        output_file = f"tests/other_output/{output_file}"
        ref_file = f"tests/reference_files/{ref_file}"
        assert(filecmp.cmp(output_file, ref_file, shallow=False))

All of these tests are passing, so let’s look at the html files.

Testing that html files exist

We can’t easily check the contents of the html files at the moment because they change slightly every time they’re generated, but we can check that they’ve been created. We’ll add functions to get the html output and reference files, and test that they all exist. We also need to copy the html files from current_ir_plots/ to tests/reference_files/.

...
def get_pkl_reference_files():
    return [f for f in get_reference_files() if Path(f).suffix=='.pkl']

def get_html_output_files():
    return [f for f in get_current_ir_plots_files()
            if Path(f).suffix=='.html']

def get_html_reference_files():
    return [f for f in get_reference_files() if Path(f).suffix=='.html']
...

def test_pkl_output_files_contents(run_process_hx_data):
    ...

def test_html_output_files_exist(run_process_hx_data):
    # Assert output html file names match reference html file names.
    html_output_files = get_html_output_files()
    html_ref_files = get_html_reference_files()
    assert(set(html_output_files) == set(html_ref_files))

We now have 5 tests that pass. I feel much better going into the optimization and refactoring work with these critical tests written. These tests could be refactored as well, but I’m not going to focus on that for now. I’m not sure how extensively I’ll ever test this project, so I don’t want to invest more effort in optimizing this file at the moment.

Addressing warnings

Before we focus on the project code, let’s run the tests with warnings and address that:

$ python -m pytest
============================= test session starts ==============================
platform darwin -- Python 3.8.5, pytest-6.0.1, py-1.9.0, pluggy-0.13.1
rootdir: /Users/eric/sitka_irg_analysis
collected 5 items                                                              

tests/test_process_hx_data.py .....                                      [100%]

=============================== warnings summary ===============================
irg_env/lib/python3.8/site-packages/docx/section.py:7
  /Users/eric/sitka_irg_analysis/irg_env/lib/python3.8/site-packages/docx/section.py:7: DeprecationWarning:
  
  Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3, and in 3.9 it will stop working

-- Docs: https://docs.pytest.org/en/stable/warnings.html
======================== 5 passed, 1 warning in 15.48s =========================

This is an issue in one of the libraries, python-docx. This library is up to date as of this writing, so the warning will continue to appear. We’ll just run the tests with warnings disabled most of the time; if you do this, it’s important to run your tests with warnings enabled from time to time, to make sure the rest of your code is not generating any new warnings. These tests take longer than I’d like to run, but they’ll start to run faster as we optimize the project code.

Note: The commit hash with all tests written is 1c5adaf.

Profiling

Now we’re at the fun part! We’ve got some tests written, so we can start to optimize and refactor, while staying confident that we’re not changing the output of the project. We’re going to profile process_hx_data.py in order to focus on the slowest code. Before we do that I’m going to write down how I’d go about this if I wasn’t profiling. I’m curious to see how well my intuition matches what we find from profiling.

Without profiling, I’d focus on what seems slowest and ugliest to me:

I imagine some of these things will come up in the process of profiling, but I’m expecting that other things I wouldn’t have thought of will come up as well. I’m quite curious to see what those things are.

Benchmarking

First, let’s use a simple approach. I’m on macOS, and there’s a time function you can use to see how long a process takes. It’s not a perfect benchmark because it depends on how busy your overall system is, but it’s a reasonable measure in a project that takes on the order of 10 seconds to run. I’m not running anything else on my system that will significantly impact the execution time of this file.

Omitting the console output, here’s what I get:

$ time python process_hx_data.py
...
real  0m15.706s
user  0m14.351s
sys   0m0.998s

In real-world time, it takes about 16 seconds to run process_hx_data.py on my system. If I do this several times, I see times that differ from this on the order of one or two tenths of a second. We should keep that in mind as we try different improvements; a speedup of 1 or 2 seconds will definitely be significant, but a speedup of 0.1 or 0.2 seconds may not be very meaningful.

Now let’s use a proper profiling tool. The cProfile tool comes with Python, and keeps track of a number of metrics as a program runs. Here’s the results of running process_hx_data.py with cProfile. I’ve omitted the program’s output, and just included the first lines of profiling output:

$ python -m cProfile -s cumtime process_hx_data.py
...
         25955623 function calls (25467413 primitive calls) in 21.214 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1732/1    0.007    0.000   21.218   21.218 {built-in method builtins.exec}
        1    0.033    0.033   21.218   21.218 process_hx_data.py:1(<module>)
        1    0.012    0.012   20.327   20.327 process_hx_data.py:43(process_hx_data)
       11    0.010    0.001    7.633    0.694 plot_heights.py:197(plot_data)
       11    0.007    0.001    7.597    0.691 offline.py:402(plot)
       22    0.000    0.000    6.112    0.278 _figure.py:55(__init__)
       22    0.002    0.000    6.111    0.278 basedatatypes.py:60(__init__)
173193/7150    0.343    0.000    5.932    0.001 basedatatypes.py:3432(__setitem__)
16462/1364    0.117    0.000    5.843    0.004 basedatatypes.py:3810(_set_compound_prop)
16473/838    0.090    0.000    5.760    0.007 basevalidators.py:2439(validate_coerce)
3622/1782    0.009    0.000    5.397    0.003 basedatatypes.py:4417(__setitem__)
        2    1.317    0.659    4.742    2.371 analysis_utils.py:151(get_first_critical_points)
       11    0.010    0.001    4.439    0.404 plot_heights.py:333(plot_data_static)
...

There were over 25 million function calls, and the file took 21.2 seconds to run. The total time using cProfile is longer than using time, because cProfile is keeping track of the performance of each function as the program runs. The -s cumtime sorts the output by cumulative time for that function, which is what we’re most interested in when trying to make the overall project more efficient.

The first few lines are the overall times for the script, and the main function we wrote to run the script. For example the third line of output is for the function process_hx_data(), called on line 43 of process_hx_data.py:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.012    0.012   20.327   20.327 process_hx_data.py:43(process_hx_data)

This function was called 1 time, and it’s cumulative time was 20.327 seconds.

Here’s the same output, cleaned up a little to only include lines of code from files I wrote, and a few other lines that are worth discussing.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       11    0.010    0.001    7.633    0.694 plot_heights.py:197(plot_data)
       11    0.007    0.001    7.597    0.691 offline.py:402(plot)
        2    1.317    0.659    4.742    2.371 analysis_utils.py:151(get_first_critical_points)
       11    0.010    0.001    4.439    0.404 plot_heights.py:333(plot_data_static)
       11    0.000    0.000    3.381    0.307 pyplot.py:719(savefig)
       11    0.000    0.000    3.381    0.307 figure.py:2038(savefig)
        1    0.674    0.674    3.164    3.164 plot_heights.py:132(get_readings_arch_format)
  2638298    1.773    0.000    2.990    0.000 ir_reading.py:11(get_slope)
   140583    0.092    0.000    2.359    0.000 {built-in method strptime}
       11    0.001    0.000    1.280    0.116 webbrowser.py:71(open)
       11    0.002    0.000    1.278    0.116 webbrowser.py:668(open)
       11    0.000    0.000    1.234    0.112 os.py:1000(close)
       13    0.000    0.000    1.233    0.095 subprocess.py:1074(wait)
       13    0.001    0.000    1.233    0.095 subprocess.py:1772(_wait)
       12    0.000    0.000    1.232    0.103 subprocess.py:1759(_try_wait)
       12    1.232    0.103    1.232    0.103 {built-in method posix.waitpid}

There’s quite a lot in here that’s interesting:

This is much better than just guessing where to focus our optimization and refactoring efforts. Now I’m excited to start optimizing, and see how much of this we can improve!

So here’s our revised plan for optimization, based on the profiling output:

This isn’t a whole lot different than what I would have focused on without profiling, but we do now have an objective way to assess the impacts of any opitimzations and refactoring work we do.

First optimization: don’t open html files automatically

For the first optimization, we’ll revise plot_data() so it doesn’t open the html files automatically. This requires two changes in plot_heights.py:

...
def plot_data(readings, critical_points=[], known_slides=[],
        root_output_directory='', auto_open=False):
        
    ...
    fig = {'data': data, 'layout': my_layout}
    filename = f"{root_output_directory}current_ir_plots/ir_plot_{readings[-1].dt_reading.__str__()[:10]}.html"
    offline.plot(fig, filename=filename, auto_open=auto_open)
    print("\nPlotted data.")

We don’t need to make any changes in process_hx_data.py, because we want to use the new default behavior of the function.

Because there are so many function calls in this project, the output of cProfile is a little difficult to look through in the terminal. I’m going to start dumping the output to a file:

$ python -m cProfile -s cumtime process_hx_data.py > tmp_profile_output.txt

I’ll also ignore this output file in .gitignore. As a side note, when I first ran this command I accidentally typed >> instead of >. The double angle brackets append output to tmp_profile_output.txt instead of rewriting the file each time. It took a little while to figure out why the profile times were exactly the same each time I ran the program. :/

Here’s the relevant part of the output:

         25954500 function calls (25466290 primitive calls) in 17.028 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       11    0.001    0.000    5.044    0.459 plot_heights.py:197(plot_data)

The overall time while profiling dropped from 20.3 to 17.0 seconds. The time spent in plot_data() dropped from 7.63 seconds to 5.04 seconds. The webbrowser entries are gone, and the wait entries are gone as well.

Let’s try timing it without profiling:

$ time python process_hx_data.py 
...
real  0m11.515s
user  0m12.320s
sys   0m0.515s

This has dropped from 15.7 seconds to 11.5 seconds, for a full 4-second improvement. This is a huge improvement already, and we don’t have to deal with all those automatically-opened browser tabs as well.

Finally, let’s try running our tests:

$ python -m pytest --disable-warnings
...
======================== 5 passed, 1 warning in 11.86s =========================

All the tests are passing, and they’re faster and easier to run as well. We’re not seeing the files pop up, but the correct files are still being created.

Note: The commit hash at this point is a140d80.

Looking at plot_data()

The function plot_data() in plot_heights.py could certainly use some refactoring, but there’s nothing obvious in here at the moment that’s slowing things down. The first half of the function is building lists and values for the plot, checking to see if there’s a relevant slide to plot, and working out an appropriate title. The second half of the function defines the data for the plot. Because of Plotly’s syntax, this section is always going to be a little long.

Let’s hold off on modifying plot_data() for the moment, and look at the core data structures first.

Looking at IRReading

The core data structure for this project is the IRReading class in utils/ir_reading.py. A reading only has two attributes that we’re interested in: the timestamp of the reading and the height of the river at that point in time. This is a simple class, but we have about 140,000 data points, so any optimization here could have a meaningful impact on the efficiency of the overall project.

I wrote this simple class quickly at the start of the project, and then focused on the analysis and visualization work. Let’s see if being more thoughtful about the structure of this class can make the project more efficient.

Using __slots__

The __slots__ attribute on a class tells Python that the attributes that are defined in the class are the only attributes that objects of the class will ever use. Normally when you create an instance from a class, Python adds a dictionary called __dict__ to the object, to keep track of attributes and their values. When we define __slots__, we’re telling Python we don’t need this dictionary. This can save a lot of memory and time when we’re making many objects from a class.

To try this, we only need a one-line change to utils/ir_reading.py:

"""Model for working with readings."""

class IRReading:

    __slots__ = ('dt_reading', 'height')

    def __init__(self, dt_reading, height):
        """Every reading has a datetime, and a height."""
        self.dt_reading = dt_reading
        self.height = height
    ...

Here’s the output from profiling this one-line change:

         25954429 function calls (25466219 primitive calls) in 16.494 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1732/1    0.006    0.000   16.497   16.497 {built-in method builtins.exec}
        1    0.007    0.007   16.497   16.497 process_hx_data.py:1(<module>)
        1    0.004    0.004   15.705   15.705 process_hx_data.py:43(process_hx_data)
       11    0.001    0.000    5.059    0.460 plot_heights.py:197(plot_data)

We’ve dropped from 17.0 seconds to 16.5 seconds while profiling, for another half second improvement. Running the file with time gives a user time of 11.3 seconds, a slight improvement over the 11.5 seconds we saw previously.

Let’s make sure the tests still pass:

$ python -m pytest --disable-warnings
...
=========================== short test summary info ============================
FAILED tests/test_process_hx_data.py::test_pkl_output_files_contents - Assert...
=================== 1 failed, 4 passed, 1 warning in 11.20s ====================

One test is failing: test_pkl_output_files_contents(). That makes sense, because we’re now pickling objects that have a different data structure. We’ll look at a couple other data structures before addressing this test.

Using dataclass

A dataclass is a special kind of class that’s designed for situations where the primary use of the class is to store data in the objects’ attributes. When you use the @dataclass decorator, Python automatically builds an __init__() method based on the attriubtes you’ve defined for the class. Using a dataclass probably won’t help much in this project, but let’s give it a try. We’ll need to declare the type of value each attribute will refer to.

Here’s the changes. Again, the only changes needed are in the ir_reading.py file:

"""Model for working with readings."""

import datetime
from dataclasses import dataclass

@dataclass
class IRReading:

    dt_reading: datetime.datetime
    height: float

    def get_slope(self, reading_2):
        ...

    def get_rise(self, reading_2):
        ...

    def get_formatted_reading(self):
        ...

We need to declare the attributes we want for each instance of the class, and we don’t need an __init__() method; the @dataclass decorator creates the __init__() method for us.

Here’s the profile output:

         26469609 function calls (25981356 primitive calls) in 17.257 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1736/1    0.028    0.000   17.260   17.260 {built-in method builtins.exec}
        1    0.012    0.012   17.260   17.260 process_hx_data.py:1(<module>)
        1    0.006    0.006   16.449   16.449 process_hx_data.py:43(process_hx_data)
       11    0.001    0.000    5.041    0.458 plot_heights.py:197(plot_data)

At 17.3 seconds this takes longer than the regular class; working with a dataclass is convenient, but not necessarily faster than a regular class. Using time, the program takes about 11.6 seconds, which is also slower than the original approach.

Let’s declare __slots__, along with using the dataclass:

...
@dataclass
class IRReading:

    __slots__ = ('dt_reading', 'height')
    dt_reading: datetime.datetime
    height: float

    def get_slope(self, reading_2):
        ...

This is about the same as using the original class, with __slots__:

         26469795 function calls (25981542 primitive calls) in 16.712 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1736/1    0.025    0.000   16.716   16.716 {built-in method builtins.exec}
        1    0.008    0.008   16.716   16.716 process_hx_data.py:1(<module>)
        1    0.004    0.004   15.929   15.929 process_hx_data.py:43(process_hx_data)
       11    0.001    0.000    5.008    0.455 plot_heights.py:197(plot_data)

At 16.7 seconds, this is comparable to the 16.5 seconds we saw using __slots__ with the original class. Without profiling, we’re down to 11.2 seconds, which is about the same as the 11.3 seconds we saw using __slots__ previously.

Using named tuples

A named tuple is a tuple where the elements are accessed by name using dot notation, rather than by their index. When you’re using a class with a known set of attributes, you can often replace it with a named tuple. Named tuples can’t have methods, but if the methods you have can be replaced by functions, they might work for your use case. This approach should work in this project, so let’s give it a try.

Here’s all of ir_reading.py, using a named tuple instead of a class. I’ve cleaned up the documentation in the file as well:

"""Model for working with Indian River stream gauge readings."""

from collections import namedtuple


IRReading = namedtuple('IRReading', ['dt_reading', 'height'])


def get_slope(reading, reading_2):
    """Calculate the slope between the two points.
    Return the abs value of the slope, in ft/hr.
    Assumes reading_2 is the earlier reading.
    """

    d_height = reading.height - reading_2.height
    # Calculate time difference in hours.
    d_time = (reading.dt_reading - reading_2.dt_reading).total_seconds() / 3600
    slope = d_height / d_time

    return abs(slope)

def get_rise(reading, reading_2):
    """Calculate the rise between two points.
    Assume reading_2 is the earlier reading.
    """
    return reading.height - reading_2.height

def get_formatted_reading(reading):
    """Print a neat string of the reading."""
    dt = reading.dt_reading.strftime('%m/%d/%Y %H:%M:%S')
    return f"{dt} - {reading.height}"

The attributes of the original class are now elements in the named tuple, and the methods are standalone functions.

Instances of a named tuple are created the same way instances are created from a class, so we don’t need to touch any code that created instances of IRReading. We do, however, need to modify the calls to get_slope(), get_rise(), and get_formatted_reading().

In process_hx_data.py, there’s one call to get_formatted_reading(), so we need to import the ir_reading module:

import pickle

import plot_heights as ph
from slide_event import SlideEvent
import utils.analysis_utils as a_utils
import utils.ir_reading as ir_reading
from utils.analysis_utils import RISE_CRITICAL, M_CRITICAL

And we need to change this method call:

print(reading.get_formatted_reading())

To a simple function call:

print(ir_reading.get_formatted_reading(reading))

In plot_heights.py, we need to modify the import statement and replace all occurrences of IRReading with ir_reading.IRReading.

There are four calls to get_formatted_reading() in this file. Each of these looks something like this:

print(f"  First reading: {readings[0].get_formatted_reading()}")

These calls need to be modified to look like this:

print(f"  First reading: {ir_reading.get_formatted_reading(readings[0])}")

Also, there’s a line where the value of an attribute is modified:

reading.height = critical_height

You can’t modify the values of elements in named tuples, but there’s a _replace(() method that can help us:

reading = reading._replace(height=critical_height)

This should replace the existing reading object with a new reading object that has the correct height.

Finally, we need to update analysis_utils.py. We need to update the import statement, one call to IRReading(), two calls to get_slope(), and two calls to get_rise(). All of the calls to get_formatted_reading() in this file have been commented out, so we’ll leave those be for the moment.

The original calls to the get_slope() method looked like this:

m = reading.get_slope(prev_reading)

These should now look like this:

 m = ir_reading.get_slope(reading, prev_reading)

Similarly, the calls to get_rise() should now look like this:

rise = ir_reading.get_rise(reading, prev_reading)

With these changes, we should be able to profile process_hx_data.py again. Here are the results:

         23458812 function calls (22970601 primitive calls) in 15.889 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1733/1    0.006    0.000   15.893   15.893 {built-in method builtins.exec}
        1    0.008    0.008   15.893   15.893 process_hx_data.py:1(<module>)
        1    0.007    0.007   15.110   15.110 process_hx_data.py:44(process_hx_data)
       11    0.001    0.000    5.028    0.457 plot_heights.py:197(plot_data)

We’re now under 16 seconds while profiling. Without profiling we’re down to 10.9 seconds, and this is the first run I’ve seen under 11 seconds.

Summary

Here’s a summary of how the choice of data structure impacted performance in this project, as it’s currently written:

Data Structure cProfile measurement time measurement
Regular class 17.0 s 11.5 s
Regular class with __slots__ 16.5 s 11.3 s
Dataclass 17.3 s 11.6 s
Dataclass with __slots__ 16.7 s 11.2 s
Named Tuple 15.90 s 10.9 s

In this project, we’ll go with named tuples. It’s the most significant speedup, and they should be pretty straightforward to work with in the context of this project.

Note: If you want to run any code after this without making the above changes yourself, the commit hash for implementing named tuples is 6393e95.

Fixing tests

At this point I’m expecting all tests to still pass, except the test for the contents of the pkl files. We’ve changed our data structure, so when we save a set of data points in using the Python data structure we’ve defined (IRReading), those files should be different now.

Let’s run the test and verify this is correct:

$ python -m pytest --disable-warnings
...
=========================== short test summary info ============================
FAILED tests/test_process_hx_data.py::test_png_output_files_contents - Assert...
FAILED tests/test_process_hx_data.py::test_pkl_output_files_contents - Assert...
=================== 2 failed, 3 passed, 1 warning in 11.05s ====================

Now two tests are failing! The pkl files are incorrect, as expected, but so are the contents of the png files. Let’s take a look at one of the files. I always look at the event from 9/21/2019, because there was a slide with critical points on that date. Here’s what that file looks like at the moment:

The critical region for the previous hours is not being calculated (or plotted) correctly.

Failed tests do feel good, in that you know they’re working. Let’s take a look at what’s happening here. Here’s the current code that’s calculating the critical region in the static plots, in plot_data_static() in plot_heights.py:

    dt_first_min_prev_reading = latest_reading.dt_reading - datetime.timedelta(hours=12)
    min_crit_prev_readings = [ir_reading.IRReading(r.dt_reading, 27.0)
                                for r in readings
                                if r.dt_reading >= dt_first_min_prev_reading]

    for reading in min_crit_prev_readings:
        dt_lookback = reading.dt_reading - datetime.timedelta(hours=5)
        # Get minimum height from last 5 hours of readings.
        relevant_readings = [r for r in readings
            if (r.dt_reading >= dt_lookback) and (r.dt_reading < reading.dt_reading)]
        critical_height = min([r.height for r in relevant_readings]) + 2.5

        # Make sure critical_height also gives a 5-hour average rise at least
        #   as great as M_CRITICAL. Units are ft/hr.
        m_avg = (critical_height - relevant_readings[0].height) / 5
        if m_avg < 0.5:
            # The critical height satisfies total rise, but not sustained rate
            #   of rise. Bump critical height so it satisfies total rise and
            #   rate of rise.
            critical_height = 5 * 0.5 + relevant_readings[0].height

        # reading.height = critical_height
        reading = reading._replace(height=critical_height)

You don’t have to understand this code well at all to get a sense of what’s going on. Basically, this code creates a set of readings over the previous 12 hours. It then iterates over those readings, calculates the minimum river height that would make conditions critical, and changes the reading height to match that value.

This logic still works, but it doesn’t work for named tuples. Instead of creating the readings first, we’ll create an empty list of IRReading objects, create the timestamps we want to loop over, and then create those objects inside the loop once we know what the height for each point should be. This approach would have worked for the original data structure as well, but I didn’t happen to choose that particular approach.

Here’s the revised code:

    dt_first_min_prev_reading = latest_reading.dt_reading - datetime.timedelta(hours=12)
    min_crit_prev_readings = []
    prev_datetimes = [r.dt_reading for r in readings
                        if r.dt_reading >= dt_first_min_prev_reading]

    for dt in prev_datetimes:
        dt_lookback = dt - datetime.timedelta(hours=5)
        # Get minimum height from last 5 hours of readings.
        relevant_readings = [r for r in readings
            if (r.dt_reading >= dt_lookback) and (r.dt_reading < dt)]
        critical_height = min([r.height for r in relevant_readings]) + 2.5

        # Make sure critical_height also gives a 5-hour average rise at least
        #   as great as M_CRITICAL. Units are ft/hr.
        m_avg = (critical_height - relevant_readings[0].height) / 5
        if m_avg < 0.5:
            # The critical height satisfies total rise, but not sustained rate
            #   of rise. Bump critical height so it satisfies total rise and
            #   rate of rise.
            critical_height = 5 * 0.5 + relevant_readings[0].height

        reading = ir_reading.IRReading(dt, critical_height)
        min_crit_prev_readings.append(reading)

This should work. Let’s see if it does, by running our tests again:

$ python -m pytest --disable-warnings
...
F=========================== short test summary info ============================
FAILED tests/test_process_hx_data.py::test_pkl_output_files_contents - Assert...
=================== 1 failed, 4 passed, 1 warning in 11.07s ====================

We have one failing test, but that’s the test about pkl file contents. The png files are correct once again. Opening the file at tests/current_ir_plots/ir_plot_2019-09-21.png shows that the critical region is being calculated correctly again.

This sure shows the value of tests. I won’t show it here, but a spot check of the file ir_plot_2018-09-21.html is still correct as well; it doesn’t calculate a critical region for previous timestamps, so I don’t think there was ever an issue with it.

Fixing the failing pkl test

I’m going to assume the new .pkl files are correct for now. To fix this test, we’ll simply copy the new .pkl files from other_output/ to tests/reference_files/.

Now all of our tests pass!

$ python -m pytest --disable-warnings
...
======================== 5 passed, 1 warning in 10.85s =========================

We’ve changed a bit of code, so let’s run the profile again and see where we’re at:

         23457672 function calls (22969461 primitive calls) in 15.901 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1733/1    0.006    0.000   15.905   15.905 {built-in method builtins.exec}
        1    0.008    0.008   15.905   15.905 process_hx_data.py:1(<module>)
        1    0.004    0.004   15.113   15.113 process_hx_data.py:44(process_hx_data)
       11    0.001    0.000    5.069    0.461 plot_heights.py:197(plot_data)

We’re right where we were at 15.9 seconds. Without profiling, we’re at 11.0 seconds, within a tenth of a second of the previous time. I’m seeing variations in the results of time profiling on the order of one or two tenths of a second, so this is not a significant difference.

Note: The commit hash at this point is c6f2722.

Doing less work

All of this focus on performance has kept me thinking about how to stop doing any unnecessary work. Over dinner, I suddenly realized there are many points that we know are not critical without examining them in detail. I’ve had a sense of this from looking at plots for a long time, but I’ve never focused on it enough to specify what the conditions are, and how to identify them in code.

If we know the critical rise is, for example, 2.5 feet, then we should be able to identify a threshold below which we never check for critical points. For example if the river’s base height is 21 feet, then we never need to check for critical points when the river is less than 23.5 feet. In code, we won’t check for critical points when

reading.height < RIVER_MIN_HEIGHT + RISE_CRITICAL

This should cut out a lot of data processing! Let’s give it a try. This improvement won’t be anything Python-specific, but spending time focused on optimizing Python means I’m also thinking about how to optimize the logic of the program as well; these things can easily go hand in hand.

All of the necessary changes are in analysis_utils.py. First, we’ll define a minimum height for the river:

...
RISE_CRITICAL = 2.5
M_CRITICAL = 0.5
RIVER_MIN_HEIGHT = 20.5
...

Then, we’ll add an if-block in get_critical_points() that continues the loop if the reading height is below the critical value we just identified:

  ...
    for reading_index, reading in enumerate(readings[max_lookback:]):
        # print(f"  Examining reading: {reading.get_formatted_reading()}")

        # If reading is below the base level of the river plus the minimum
        #   critical rise, this reading can't be critical, and we don't need
        #   to examine it.
        if reading.height < RIVER_MIN_HEIGHT + RISE_CRITICAL:
            continue

        # Get prev max_lookback readings.
        prev_readings = [reading for reading in readings[reading_index-max_lookback:reading_index]]
        for prev_reading in prev_readings:
            ...

We also need to add this same block to get_first_critical_points():

    ...
    first_critical_points = []
    # Start with 10th reading, so can look back.
    for reading_index, reading in enumerate(readings[max_lookback:]):
        # print(f"  Examining reading: {reading.get_formatted_reading()}")

        # If reading is below the base level of the river plus the minimum
        #   critical rise, this reading can't be critical, and we don't need
        #   to examine it.
        if reading.height < RIVER_MIN_HEIGHT + RISE_CRITICAL:
            continue
        
        # Get prev max_lookback readings.
        prev_readings = [reading for reading in readings[reading_index-max_lookback:reading_index]]
        for prev_reading in prev_readings:
            ...

Profiling again shows how much time we’re saving:

         12854010 function calls (12365798 primitive calls) in 13.350 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1733/1    0.011    0.000   13.354   13.354 {built-in method builtins.exec}
        1    0.009    0.009   13.354   13.354 process_hx_data.py:1(<module>)
        1    0.004    0.004   11.797   11.797 process_hx_data.py:44(process_hx_data)
       11    0.001    0.000    5.637    0.512 plot_heights.py:197(plot_data)

This is pretty interesting output. We’ve dropped from about 15.9 seconds to about 13.4 seconds, for a savings of about 2.5 seconds! There are a number of important things to notice in the output. First, the total number of function calls has dropped from about 23 million calls to just under 13 million calls. I don’t know why plot_data() is taking more time (from about 5 seconds to over 5.5 seconds), but the savings in other areas more than makes up for this.

Earlier we noted that there were 2.6 million calls to get_slope(), which took about 3 seconds. Searching the profile output for get_slope, we find this line:

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    22518    0.012    0.000    0.019    0.000 ir_reading.py:9(get_slope)

We’re only calling this function about 22,500 times now, and the cumulative time spent in this function has dropped from about 3 seconds to about two hundredths of a second. That’s a huge improvement.

Running the file with time, it now takes only 8.9 seconds to run process_hx_data.py. This is a full 2-second speedup, and it’s nice to be under 10 seconds for the first time.

I have to admit when I first tried this optimization I modified get_critical_points(), and didn’t notice that this optimization applies to get_first_critical_points() as well. The change only saved a tenth of a second or so. I was really confused, thinking this change would have a more significant impact than that. It wasn’t until later in the process, after some other optimizations, when I noticed that get_first_critical_points() was still high up in the profile output, taking more time than I thought it should. It was quite validating to see that adding this check to get_first_critical_points() made this optimization have the kind of impact I thought it would. That was also a reminder to trust the profile output to guide refactoring and optimization work. Without the profiler, I may have gone a long time without noticing where exactly I should apply this optimization.

Note: The commit hash at this point is c6edbc3.

Parallel processing

Let’s take a look at parallelization. I believe this should have a pretty significant impact on the efficiency of this project, because we’re working with multiple data files, and generating many output files. Also, parallelization requires functions with clear, focused purpose so implementing parallelization should push us to reorganize the project in a much clearer way as well.

Right now, process_hx_data() is processing raw data files, storing data values, and making multiple kinds of plots and Python-specific data files. This is all happening in a somewhat haphazard manner, which is a symptom of writing exploratory procedural code without being mindful of long-term efficiency. To fix this, we want to write a number of functions that do one specific task:

This will also push us to define the data structures that we’re working with as we call these functions. Right now these are a bunch of variables in the middle of the file, such as notifications_issued, associated_notifications, and more. We’ll need to structure these differently so they can be passed between functions, and so they can be built up in a parallel workflow.

We’ll first restructure the entire file to use the functions listed above, and make sure our tests still pass. The execution time shouldn’t change a whole lot. Then we’ll try to parallelize the plots, which should be straightforward because each plot is independent of the other plots. Then we’ll look at parallelizing the data processing and analysis, which will probably be more work because data from one file is combined with data from another file to produce overall results.

Reorganizing process_hx_data.py

Here’s the new process_hx_data.py:

[REVISION: combine reading sets in get_reading_sets()]

"""Process all historical IR gauge data.

This file loads historical data from the Indian River stream gauge. It loads
  values for critical rate of rise, and critical total rise. It then runs
  through all the stream gauge data. It flags any data point at which the 
  stream gauge has met or exceeded these critical values, and captures a
  48-hour period of readings around that point. When these readings are
  plotted, any known slides that occurred during this period are plotted as
  well.

  The script then processes known slide events. If a slide event is not
  already associated with a critical period, 48 hours of readings around that
  slide event are grabbed, and these are plotted as well.

We want to be able to use the output to answer the following questions. In
  this context, 'notification time' refers to the time between the first
  critical point identified in a critical period, and the time at which a
  slide occurred. This is the amount of time people would have to respond
  if a notification were issued the moment the first critical point was
  identified.

  A true positive is a critical point associated with a slide,
  a false positive is a critical point with no associated slide. A false 
  negative is a slide with no associated critical point.

  - How many slides were preceded by a critical point?
  - What do the notification times look like?
  - How many true and false positives and false negatives were there?
  - How many notifications would have been issued over a 5-year period?
  - Was there anything special about slides that were missed? (false negative)
  - Was there anything special about critical points with no slide? (false
      positive)
"""

import pickle

import plot_heights as ph
from slide_event import SlideEvent
import utils.analysis_utils as a_utils
import utils.ir_reading as ir_reading
from utils.analysis_utils import RISE_CRITICAL, M_CRITICAL


# Track overall stats.
#   How many notifications followed by slides?
#   How many notifications not followed by slides?
#   How many slides were not missed?
stats = {
    "notifications_issued": 0,
    "associated_notifications": 0,
    "unassociated_notifications": 0,
    "unassociated_notification_points": [],
    "relevant_slides": [],
    "unassociated_slides": [],
    "notification_times": {},
    "earliest_reading": None,
    "latest_reading": None,
}


def get_readings_from_data_file(data_file):
    """Process a single data file.
    Returns a list of IRReading objects.
    """
    # Use proper parsing function.
    if 'hx_format' in data_file:
        all_readings = ph.get_readings_hx_format(data_file)
    elif 'arch_format' in data_file:
        all_readings = ph.get_readings_arch_format(data_file)

    return all_readings


def get_reading_sets(readings, known_slides, stats):
    """Takes in a single list of readings, and returns two lists,
    critical_reading_sets and slide_readings_sets.

    critical_reading_sets: lists of readings around critical events, which
      may or may not include a slide
    slide_reading_sets: lists of readings around slide events that are not
      associated with critical points.

    Updates stats.
    """
    # Keep track of earliest and latest reading across all data files.
    #   DEV: This is probably better calculated later, in a separate fn.
    if not stats['earliest_reading']:
        stats['earliest_reading'] = readings[0]
        stats['latest_reading'] = readings[-1]
    else:
        if readings[0].dt_reading < stats['earliest_reading'].dt_reading:
            stats['earliest_reading'] = readings[0]
        if readings[-1].dt_reading > stats['latest_reading'].dt_reading:
            stats['latest_reading'] = readings[-1]

    # Get all the known slides that occurred during these readings.
    slides_in_range = a_utils.get_slides_in_range(
            known_slides, readings)

    # Find the start of all critical periods in this data file.
    first_critical_points = a_utils.get_first_critical_points(readings)
    for reading in first_critical_points:
        print(ir_reading.get_formatted_reading(reading))
    stats['notifications_issued'] += len(first_critical_points)

    # critical_reading_sets is a list of lists. Each list is a set of
    #   readings to plot, based around a first critical point.
    critical_reading_sets = [a_utils.get_48hr_readings(fcp, readings)
                                    for fcp in first_critical_points]

    # Determine which critical sets are associated with slides, so we can
    #   process readings for unassociated slides and build
    #   slide_reading_sets.
    for reading_set in critical_reading_sets:
        critical_points = a_utils.get_critical_points(reading_set)
        relevant_slide = ph.get_relevant_slide(reading_set, known_slides)
        if relevant_slide:
            stats['relevant_slides'].append(relevant_slide)
            stats['associated_notifications'] += 1
            notification_time = ph.get_notification_time(critical_points,
                    relevant_slide)
            stats['notification_times'][relevant_slide] = notification_time
            # Remove this slide from slides_in_range, so we'll
            #   be left with unassociated slides.
            slides_in_range.remove(relevant_slide)
        else:
            # This may be an unassociated notification.
            stats['unassociated_notification_points'].append(
                    critical_points[0])
            stats['unassociated_notifications'] += 1

    # Any slides left in slides_in_range are unassociated.
    #   We can grab a 48-hr data set around these slide.
    slide_reading_sets = []
    for slide in slides_in_range:
        # Get first reading after this slide, and base 48 hrs around that.
        for reading in readings:
            if reading.dt_reading > slide.dt_slide:
                slide_readings = a_utils.get_48hr_readings(
                        reading, readings)
                slide_reading_sets.append(slide_readings)
                break

        stats['unassociated_slides'].append(slide)

    # We aren't processing reading sets separately, so combine them here.
    return critical_reading_sets + slide_reading_sets


def pickle_reading_set(reading_set, root_output_directory=''):
    """Pickle a reading set, for further analysis and quicker plotting."""
    dt_last_reading_str = reading_set[-1].dt_reading.strftime('%m%d%Y')
    dump_filename = f'{root_output_directory}other_output/reading_dump_{dt_last_reading_str}.pkl'
    with open(dump_filename, 'wb') as f:
        pickle.dump(reading_set, f)

def generate_interactive_plot(
            reading_set, known_slides, root_output_directory):
    """Generate an interactive html plot."""
    critical_points = a_utils.get_critical_points(reading_set)
    ph.plot_data(reading_set, known_slides=known_slides,
        critical_points=critical_points,
        root_output_directory=root_output_directory)


def generate_static_plot(
            reading_set, known_slides, root_output_directory):
    """Genereate a static plot image."""
    critical_points = a_utils.get_critical_points(reading_set)
    ph.plot_data_static(reading_set, known_slides=known_slides,
        critical_points=critical_points,
        root_output_directory=root_output_directory)


def summarize_results(known_slides, stats):
    """Summarize results of analysis."""
    assert(stats['unassociated_notifications']
            == len(stats['unassociated_notification_points']))
    stats['unassociated_slides'] = set(known_slides) - set(stats['relevant_slides'])
    slides_outside_range = []
    for slide in known_slides:
        if ( (slide.dt_slide < stats['earliest_reading'].dt_reading)
                 or (slide.dt_slide > stats['latest_reading'].dt_reading) ):
            stats['unassociated_slides'].remove(slide)
            slides_outside_range.append(slide)
    start_str = stats['earliest_reading'].dt_reading.strftime('%m/%d/%Y')
    end_str = stats['latest_reading'].dt_reading.strftime('%m/%d/%Y')
    print("\n\n --- Final Results ---\n")
    print(f"Data analyzed from: {start_str} to {end_str}")
    print(f"  Critical rise used: {RISE_CRITICAL} feet")
    print(f"  Critical rise rate used: {M_CRITICAL} ft/hr")

    print(f"\nNotifications Issued: {stats['notifications_issued']}")
    print(f"\nTrue Positives: {stats['associated_notifications']}")
    for slide in stats['relevant_slides']:
        print(f"  {slide.name} - Notification time: {stats['notification_times'][slide]} minutes")
    print(f"\nFalse Positives: {stats['unassociated_notifications']}")
    for notification_point in stats['unassociated_notification_points']:
        print(f"  {notification_point.dt_reading.strftime('%m/%d/%Y %H:%M:%S')}")

    print(f"\nFalse Negatives: {len(stats['unassociated_slides'])}")
    for slide in stats['unassociated_slides']:
        print(f"  {slide.name}")
    print(f"\nSlides outside range: {len(slides_outside_range)}")
    for slide in slides_outside_range:
        print(f"  {slide.name}")


def process_hx_data(root_output_directory=''):
    """Process all historical data in ir_data_clean/.

    - Get known slide events.
    - Get readings from file.
    - Pull interesting reading sets from readings. Analysis is done here.
    - Pickle reading sets.
    - Plot reading sets.
    - Summarize results.

    Does not return anything, but generates:
    - pkl files of reading sets.
    - html files containing interactive plots.
    - png files containing static plots.
    - console output summarizing what was found.
    """

    # Get known slides.
    slides_file = 'known_slides/known_slides.json'
    known_slides = SlideEvent.load_slides(slides_file)

    # DEV: Should probably walk the ir_data_clean directory, instead of making
    #      this list manually.
    data_files = [
        'ir_data_clean/irva_utc_072014-022016_hx_format.txt',
        'ir_data_clean/irva_akdt_022016-102019_arch_format.txt',
    ]

    for data_file in data_files:
        readings = get_readings_from_data_file(data_file)
        reading_sets = get_reading_sets(
                readings, known_slides, stats)

        # Pickle reading sets for faster analysis and plotting later,
        #   and for use by other programs.
        for reading_set in reading_sets:
            print("Pickling reading sets...")
            pickle_reading_set(reading_set, root_output_directory)

        # Generate interactive plots.
        for reading_set in reading_sets:
            print("Generating interactive plots...")
            generate_interactive_plot(
                    reading_set, known_slides, root_output_directory)

        # Generate static plots.
        for reading_set in reading_sets:
            print("Generating static plots...")
            generate_static_plot(
                    reading_set, known_slides, root_output_directory)

    summarize_results(known_slides, stats)


if __name__ == '__main__':
    process_hx_data()

Reorganizing this file took a bit of work; I found failing tests numerous times as I separated existing code into the functions shown here.

At one point the tests for pkl files failed, because there were more pkl files being generated than expected. This led me to discover that pkl files were not being generated for periods where slides occurred outside of a critical period. This makes sense in retrospect, because there were 11 html files and 11 png files, but only 9 pkl files. I copied the two new pkl files (reading_dump_09192014.pkl and reading_dump_09172016.pkl) to tests/reference_files/. This again shows the value of a minimum set of critical tests, and of using a clean code structure.

With all tests passing, let’s run the profile:

         12913888 function calls (12425677 primitive calls) in 11.968 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1733/1    0.007    0.000   11.972   11.972 {built-in method builtins.exec}
        1    0.008    0.008   11.972   11.972 process_hx_data.py:1(<module>)
        1    0.001    0.001   11.180   11.180 process_hx_data.py:209(process_hx_data)
       11    0.000    0.000    5.033    0.458 process_hx_data.py:157(generate_interactive_plot)
       11    0.001    0.000    5.021    0.456 plot_heights.py:197(plot_data)

The profiling time has dropped from about 13.3 seconds to just under 12 seconds. I’m not quite sure why this is happening, but it’s a nice side effect of more organized code. Without profiling, the program runs in about 8.8 seconds, an insignificant 0.1-second change.

Note: The commit for this rewrite is 0be1b17.

Generating interactive plots in parallel

Now that we’re calling a single function in a loop to generate interactive plots, we can do this step in parallel. In preparing this post, I played around with both multiprocessing and multithreading. They both have issues that would need to be addressed. We’ll start by looking at multithreading, because it’s a little easier to profile and easier to see what’s going on.

Matplotlib doesn’t work as well with multithreading, so we’ll comment out the calls to generate_static_plot(). We’ll also comment out the calls to pickle_reading_set(), so we can focus on what impact making multithreaded calls to generate_interactive_plot() has.

At the top of the file, add the following import statement:

from concurrent.futures import ThreadPoolExecutor

This is a class that handles making threads, carrying out their work, and waiting for each thread to finish before moving on in the program’s execution.

We also need to modify the generate_interactive_plot() function so it’s a little easier to work with. Multithreading is simpler to implement if a function only takes one argument, so we’ll get rid of noncritical arguments for the moment. Here’s the modified version of this function:

def generate_interactive_plot(reading_set):
    """Generate an interactive html plot."""
    # Get known slides.
    slides_file = 'known_slides/known_slides.json'
    known_slides = SlideEvent.load_slides(slides_file)

    critical_points = a_utils.get_critical_points(reading_set)
    ph.plot_data(reading_set, known_slides=known_slides,
        critical_points=critical_points)

We pare the arguments down to just reading_set. We get known_slides inside the function, and we leave out the root_output_directory argument for now. This version of the function will be simpler to call when trying parallel approaches.

Here’s the loop where we process each data file. Notice I’ve commented out everything except the code that gets the reading sets, and the calls to generate_interactive_plot(). This code does not use parallel processing yet; we’re going to profile this code, and then run it in parallel and see what the difference is.

    for data_file in data_files:
        readings = get_readings_from_data_file(data_file)
        reading_sets = get_reading_sets(
                readings, known_slides, stats)

        # # Pickle reading sets for faster analysis and plotting later,
        # #   and for use by other programs.
        # for reading_set in reading_sets:
        #     print("Pickling reading sets...")
        #     pickle_reading_set(reading_set, root_output_directory)

        # # Generate interactive plots.
        # for reading_set in reading_sets:
        #     print("Generating interactive plots...")
        #     generate_interactive_plot(
        #             reading_set, known_slides, root_output_directory)

        for reading_set in reading_sets:
            generate_interactive_plot(reading_set)

        # # Generate static plots.
        # for reading_set in reading_sets:
        #     print("Generating static plots...")
        #     generate_static_plot(
        #             reading_set, known_slides, root_output_directory)

Here’s the profile output:

         11821401 function calls (11354079 primitive calls) in 9.115 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1738/1    0.006    0.000    9.118    9.118 {built-in method builtins.exec}
        1    0.008    0.008    9.118    9.118 process_hx_data.py:1(<module>)
        1    0.001    0.001    8.332    8.332 process_hx_data.py:220(process_hx_data)
       11    0.000    0.000    5.116    0.465 process_hx_data.py:166(generate_interactive_plot)
       11    0.001    0.000    5.093    0.463 plot_heights.py:197(plot_data)

Profiling takes about 9.1 seconds. Without profiling, the file takes about 6.2 seconds to run.

Now let’s try it using multithreading. Here are the changes to the loop over the data files:

    for data_file in data_files:
        
        ...
        # for reading_set in reading_sets:
        #     generate_interactive_plot(reading_set)

        with ThreadPoolExecutor(max_workers=2) as executor:
            executor.map(generate_interactive_plot, reading_sets)

        # Generate static plots.
        # for reading_set in reading_sets:
        #     print("Generating static plots...")
        #     generate_static_plot(reading_set)

ThreadPoolExecutor accepts an argument for the number of workers, and maps the function we’re calling onto the data we’re using. It sends each element of reading_sets to the function generate_interactive_plot(), using more than one thread. The with statement makes sure all of this is finished before the program moves on.

Let’s profile this approach:

         4741788 function calls (4726594 primitive calls) in 7.510 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
   1200/1    0.005    0.000    7.511    7.511 {built-in method builtins.exec}
        1    0.015    0.015    7.511    7.511 process_hx_data.py:1(<module>)
        1    0.001    0.001    6.722    6.722 process_hx_data.py:220(process_hx_data)
       66    3.543    0.054    3.543    0.054 {method 'acquire' of '_thread.lock' objects}
        2    0.000    0.000    3.543    1.772 _base.py:635(__exit__)
        2    0.000    0.000    3.543    1.771 thread.py:230(shutdown)
        4    0.000    0.000    3.543    0.886 threading.py:979(join)
        4    0.000    0.000    3.543    0.886 threading.py:1017(_wait_for_tstate_lock)

It’s pretty interesting to look at this output. The overall time while profiling went from 9.1 seconds to 7.5 seconds (remember we’re not generating pkl or png files). If you search the profiling output, you won’t find a line for generate_interactive_plot(). The threading module makes its own version of this function when it figures out how to delegate the work we’re asking it to do. In these first lines of output, though, you can see the time that’s being spent acquiring locks for threads, joining threads (this means waiting for all threads to complete before moving on), and shutting down the multithreading operations. This is comparable to the times we were seeing for plot_data() previously.

I’m running this on a machine that has two physical cores and four logical cores. My first run using workers=4 was slower, at about 8.2 seconds. If you’re running this code, you should find out how many cores your processor has, and try a few values for workers to see what settings work best on your system.

But we don’t usually run our code through a profiler. Without profiling, the file takes about 6.7 seconds to run with 2 workers. With 4 workers, it took 7.2 seconds. Both of these are slower than the 6.2 seconds it took without parallel processing. My tentative conclusion from this is that the multithreading work is more efficient for cProfile to analyze, but the overhead of multithreading is not efficient to save any time when running without profiling.

Before making any decisions about how to proceed, let’s take a look at multiprocessing.

Note: The commit at this point is 6f51854.

Multiprocessing

To start, add an import multiprocessing statement at the top of the file. We don’t need to take out any of the threading code; it’s nice to keep it in for comparison. Instead, comment out the two multithreading lines, and add these lines to try multiprocessing:

        # for reading_set in reading_sets:
        #     generate_interactive_plot(reading_set)

        # with ThreadPoolExecutor(max_workers=2) as executor:
        #     executor.map(generate_interactive_plot, reading_sets)

        pool = multiprocessing.Pool(processes=2)
        pool.map(generate_interactive_plot, reading_sets)

This creates a pool of processes, onto which the calls to generate_interactive_plot() are mapped.

Profiling multiprocessing code is harder, so let’s use a simple time call to make our comparisons.

$ time python process_hx_data.py
...
real  0m6.450s
user  0m9.421s
sys 0m0.869s

This is the result using processes=2, which is faster than using 4 processes on my system. This is comparable to the time using multithreading, but still slower than not using parallel processing at all.

I was surprised by these results, so I did a little more research. It turns out multiprocessing is good for running large batches, but not for running just a few iterations. The overhead of setting up, managing, and closing the processes is not worth the savings you get from using more than one process if you’re not repeating your tasks often enough. The same holds true for multithreading in this case. For example, if we were making 1000 calls of one second each, parallel processing would probably make a significant difference. But parallel processing doesn’t help when you’re making 10 calls at 0.5 second each.

We’re deep enough in now, we can do a really interesting experiment with one line of code.

Note: The commit hash here is 84b965d.

How helpful would multiprocessing be for many plots?

What would these times be like if we were generating more than 11 plots? Python lets you multiply a list by a number, and get a list with the original elements repeated:

>>> my_list = [ ['a', 'b'], ['c', 'd'] ]
>>> my_list * 2
[['a', 'b'], ['c', 'd'], ['a', 'b'], ['c', 'd']]
>>> my_list * 3
[['a', 'b'], ['c', 'd'], ['a', 'b'], ['c', 'd'], ['a', 'b'], ['c', 'd']]

So, we can easily see what would happen with 10 times as many reading sets and 100 times as many. As the work load increases, I would expect the parallel approaches to become more efficient at some point.

Here’s the new code:

        reading_sets *= 10
        for reading_set in reading_sets:
            generate_interactive_plot(reading_set)

        # with ThreadPoolExecutor(max_workers=2) as executor:
        #     executor.map(generate_interactive_plot, reading_sets)

        # pool = multiprocessing.Pool(processes=2)
        # pool.map(generate_interactive_plot, reading_sets)

Making 10 times as many plots, the non-parallel approach takes 31.76 seconds and the multithreading approach takes 34.03 seconds. But the multiprocessing approach takes only 21.20 seconds.

Here’s a table showing how these different approaches compare for 10 times as many plots, 100x, and 1000x:

Plots Non-Parallel MultiThreaded (w=2) MT speedup Multiprocessing (p=2) MP speedup
1x (11) 6.2 s 6.7 s 0.93x 6.5 s 0.95x
10x 33 s 35 s 0.94x 20 s 1.65x
100x 4m 43s 5m 8s 0.92x 2m 30s 1.89x
1000x 46m 41s 50m 1s 0.93x 24m 15s 1.93x

Multiprocessing is the clear winner here. We were originally generating 11 interactive plot files. As soon as we were generating about 100 files (the 10x run), multiprocessing already led to a speedup of 1.65x. When we generated 1,000 or 10,000 files, the speedup was almost twice as fast as the non-parallel approach.

How to best use parallel processing is not always obvious, especially without a lot of experience with it. It takes consideration of your current use case, the kind of work you’re doing, the volume of work, and the system you’re working on. It also takes some experimentation and probably some re-architecting.

Note: The code for comparing different workloads is at commit 31807e9.

What now?

The additional complexity and overhead of parallel approaches is not worthwhile for this project. But there are some optimization and refactoring approaches left to consider:

This should leave us with a much more readable and useful main file. There will still be plenty of work to do in the utility files to make a clean overall project, but at least process_hx_data.py will be much clearer. Anyone who wants to dig into the weeds can focus on the part of the project they’re most interested in, but most of the ugliness will be hidden for now. And we have a decent set of initial tests to help support any ongoing optimization and refactoring efforts.

Even though we haven’t sped up the program much more than the initial ~25% improvement, the focus on optimization has led to much cleaner code.

Removing parallel code

First, we’ll remove the parallel plotting code. Remove the import statements for the multithreading and multiprocessing libraries.

Since we’re not using the parallel approach, we can delete the separate generate_interactive_plot() and generate_static_plot() functions. We’ll also get rid of the parallel processing blocks. Here’s what the loop over data_files should look like:

    ...
    for data_file in data_files:
        readings = get_readings_from_data_file(data_file)
        reading_sets = get_reading_sets(
                readings, known_slides, stats)

        # Pickle reading sets for faster analysis and plotting later,
        #   and for use by other programs.
        for reading_set in reading_sets:
            print("Pickling reading sets...")
            pickle_reading_set(reading_set, root_output_directory)

        # Generate interactive plots.
        for reading_set in reading_sets:
            print("Generating interactive plots...")
            critical_points = a_utils.get_critical_points(reading_set)
            ph.plot_data(
                reading_set,
                known_slides=known_slides,
                critical_points=critical_points,
                root_output_directory=root_output_directory)

        # Generate static plots.
        for reading_set in reading_sets:
            print("Generating static plots...")
            critical_points = a_utils.get_critical_points(reading_set)
            ph.plot_data_static(
                reading_set,
                known_slides=known_slides,
                critical_points=critical_points,
                root_output_directory=root_output_directory)
    ...

If you’re following along with all of this, it’s good to run the tests again and make sure everything is working as it should.

Moving code to utils/

Let’s clean up process_hx_data.py by moving most of the code to separate files.

We’ll start by moving the stats dictionary to its own file, utils/stats.py. Here’s that file:

"""Module for tracking stats in historical analysis."""

# Track overall stats.
#   How many notifications were followed by slides?
#   How many notifications were not followed by slides?
#   How many slides were missed?
stats = {
    "notifications_issued": 0,
    "associated_notifications": 0,
    "unassociated_notifications": 0,
    "unassociated_notification_points": [],
    "relevant_slides": [],
    "unassociated_slides": [],
    "notification_times": {},
    "earliest_reading": None,
    "latest_reading": None,
}

Make sure you remove the stats dictionary from process_hx_data.py after saving stats.py. Here’s the import statement we need in process_hx_data.py:

from utils.stats import stats

All tests pass. We haven’t written any tests for the summary output or the stats dictionary, so I ran process_hx_data.py as well and looked over the console output. I made sure it largely matches what I’ve been seeing in previous runs. It would be good to test the final state of stats, rather than the summary output, but I won’t include that test here.

I also copied get_readings_from_data_file(), get_reading_sets(), pickle_reading_set(), and summarize_results() to utils/analysis_utils.py. There were a couple small changes needed to make this work:

Finally, the import statements and first part of process_hx_data.py are simpler as well because we’re using fewer resources directly in this file:

"""Process all historical IR gauge data.
    ...
"""

import plot_heights as ph
from slide_event import SlideEvent
import utils.analysis_utils as a_utils
from utils.stats import stats


def process_hx_data(root_output_directory=''):
    ...

All tests pass.

This leaves process_hx_data.py much easier to understand. It’s barely over 100 lines, and about half of those are comments and blank lines.

Note: The commit hash for the cleaned-up version at this point is 31c5368.

Building command-line arguments

When I run process_hx_data.py, I usually only need to run some of the code. For example if I’m focusing on the design of the static plots, I don’t need to generate new interactive plots. Let’s write some command-line arguments that allow us to just run the parts of the file we need.

The first argument we’ll start with will allow us to skip generating new interactive plots. The flag we’ll use is --no-interactive-plots. Here’s what we need at the top of process_hx_data.py:

import argparse, sys

import plot_heights as ph
from slide_event import SlideEvent
import utils.analysis_utils as a_utils
from utils.stats import stats


# Define cli arguments.
parser = argparse.ArgumentParser()
parser.add_argument('--no-interactive-plots',
    help="Do not generate interactive plots.",
    action='store_true')
args = parser.parse_args()


def process_hx_data(root_output_directory=''):
...

This sets up an optional flag with a default value of True when the flag is present. This is a little confusing because we’re working with an optional negative condition; when this flag is True we will not generate interactive plots. But this is the behavior we want; when the file is run with no flags, we do everything. When this flag is present, we skip this step.

Now we’ll check this flag before running the block to generate interactive plots:

        if not args.no_interactive_plots:
            print("Generating interactive plots...")
            for reading_set in reading_sets:
                critical_points = a_utils.get_critical_points(reading_set)
                ph.plot_data(
                    reading_set,
                    known_slides=known_slides,
                    critical_points=critical_points,
                    root_output_directory=root_output_directory)

Running the file with our new flag skips interactive plots:

$ python process_hx_data.py --no-interactive-plots

When you run the tests, using --disable-warnings causes issues because this flag gets passed to process_hx_data.py (I believe). If we include warnings, all the tests pass:

$ python -m pytest
...
========================= 5 passed, 1 warning in 8.72s =========================

This shows that without any flags, the program still does everything it originally did.

We can implement the other flag, --no-static-plots, in the same way. If we want, we can now see how long the file takes to run without generating any plots:

$ time python process_hx_data.py --no-interactive-plots --no-static-plots
...
real  0m2.921s
user  0m2.879s
sys   0m0.139s

It takes less than 3 seconds to run, without refreshing any of the plots. You can also run with either of these flags, and look at the timestamps of the plot files to see that only some of the plots are being regenerated.

Note: The commit hash here is aaeb003.

Using cached (pickled) data

There’s one more significant optimization we can make, related to flags. We can use the pickled data sets instead of always scraping the raw data files.

We’ve cleaned things up enough that we can grab data in a separate step from plotting data:

    ...
    data_files = [
        'ir_data_clean/irva_utc_072014-022016_hx_format.txt',
        'ir_data_clean/irva_akdt_022016-102019_arch_format.txt',
    ]

    reading_sets = []
    for data_file in data_files:
        readings = a_utils.get_readings_from_data_file(data_file)
        reading_sets += a_utils.get_reading_sets(readings, known_slides, stats)

    print("Pickling reading sets...")
    for reading_set in reading_sets:
        # Pickle reading sets for faster analysis and plotting later,
        #   and for use by other programs.
        a_utils.pickle_reading_set(reading_set, root_output_directory)

    if not args.no_interactive_plots:
        print("Generating interactive plots...")
        for reading_set in reading_sets:
            critical_points = a_utils.get_critical_points(reading_set)
            ph.plot_data(
                reading_set,
                known_slides=known_slides,
                critical_points=critical_points,
                root_output_directory=root_output_directory)
    ...

This has the nice effect of reducing the indentation level for the subsequent blocks as well. This is a really significant logical improvement. Instead of loading data from one file, analyzing and plotting it, then doing the same for another data file, we’re loading all the data first and then doing all the analysis and plotting.

Now we can add another optional flag, --use-cached-data. When this flag is present, we won’t scrape the data files or run the pickling block. We will instead fill reading_sets from the pickled files.

We need some new imports:

...
import argparse, sys, pickle

from os import listdir, path
from pathlib import Path

import plot_heights as ph
...

Here’s the block that adds this flag:

parser.add_argument('--use-cached-data',
    help="Use pickled data; don't parse raw data files.",
    action='store_true')

And here’s the code for filling reading_sets:

def process_hx_data(root_output_directory=''):
    ...

    # Get known slides.
    slides_file = 'known_slides/known_slides.json'
    known_slides = SlideEvent.load_slides(slides_file)

    reading_sets = []

    if not args.use_cached_data:
        print("Parsing raw data files...")
        data_files = [
            'ir_data_clean/irva_utc_072014-022016_hx_format.txt',
            'ir_data_clean/irva_akdt_022016-102019_arch_format.txt',
        ]
        for data_file in data_files:
            readings = a_utils.get_readings_from_data_file(data_file)
            reading_sets += a_utils.get_reading_sets(readings, known_slides, stats)

        print("Pickling reading sets...")
        for reading_set in reading_sets:
            # Pickle reading sets for faster analysis and plotting later,
            #   and for use by other programs.
            a_utils.pickle_reading_set(reading_set, root_output_directory)

    elif args.use_cached_data:
        print("Reading data from pickled files...")
        pkl_file_path = 'other_output/'
        pkl_files = [f for f in listdir(pkl_file_path)
                if path.isfile(path.join(pkl_file_path, f))
                and Path(f).suffix=='.pkl']
        # pkl_files = [f for f in pkl_files if Path(f).suffix=='.pkl']

        for pkl_file in pkl_files:
            filename = f"{pkl_file_path}{pkl_file}"
            with open(filename, 'rb') as f:
                reading_set = pickle.load(f)
                reading_sets.append(reading_set)

    if not args.no_interactive_plots:
        ...

We define an empty reading_sets before choosing how to fill it. If we’re not using cached data, we parse the data files and pickle the data that we find. If we are using cached data, we find all the pkl files, load the data from each file, and add what we find to reading_sets.

When we try using the cached data, we get an error:

$ python process_hx_data.py --use-cached-data
...
  File "/Users/eric/projects/sitka_irg_analysis/utils/analysis_utils.py", line 338, in summarize_results
    if ( (slide.dt_slide < stats['earliest_reading'].dt_reading)
AttributeError: 'NoneType' object has no attribute 'dt_reading'

This is an issue in summarize_results(), and it’s happening because I was doing too much while processing the data files when I first started this project. Fixing this issue is more than I want to address in this post. It will involve further separation of pulling data from the raw data files, and analyzing the data. It’s not particularly difficult work, but writing about that here won’t add much to the main points of this post.

What we can do now is only summarize the data when we’re processing the raw data files. When we generate plots from pickled data, we won’t summarize the results:

    if not args.use_cached_data:
        a_utils.summarize_results(reading_sets, known_slides, stats)

Now we can generate static plots from cached data, much quicker than running the entire program:

$ time python process_hx_data.py --use-cached-data --no-interactive-plots
real  0m3.407s
user  0m4.382s
sys   0m0.381s

This takes about 3.4 seconds. When I’m just working on static plot images, this will be much nicer to work with!

The tests still pass. We’re not testing process_hx_data.py as it runs with any of these cli flags; the most pressing one to test is --use-cached-data, to make sure the processing we were doing over the raw data files isn’t leaving anything out when plotting from the cached data. But we should also see any significant issues by looking at a few files for now.

The argument code doesn’t add any significant time to the program’s overall execution when running the full program. The program still takes just under 9 seconds to run without any flags.

Note: The commit hash here is 2c3b5b4.

How much plotting time is really prep time?

Now we can more easily check something I’ve been curious about. In the code that generates the interactive plots and the static plots, some analysis is being done. I’ve been curious how much of the plotting time is spent on analysis, and how much is spent on plotting. With these flags, we can check this much more efficiently.

The cProfile approach looks mostly at times per function call. We can use perf_counter() from the time module to profile code within a function.

Here’s what we’ll add to plot_data() in plot_heights.py:

def plot_data(readings, critical_points=[], known_slides=[],
        root_output_directory='', auto_open=False):
    from time import perf_counter
    start = perf_counter()
    """Plot IR gauge data, with critical points in red. Known slide
    events are indicated by a vertical line at the time of the event.
    """

    ...
    else:
        # dt_title = datetimes[0].dt_reading.astimezone(aktz)
        title_date_str = datetimes[0]

    finished_analysis = perf_counter()

    data = [
        ...

    fig = {'data': data, 'layout': my_layout}
    filename = f"{root_output_directory}current_ir_plots/ir_plot_{readings[-1].dt_reading.__str__()[:10]}.html"
    offline.plot(fig, filename=filename, auto_open=auto_open)
    print("\nPlotted data.")

    finished_plotting = perf_counter()
    analysis_time = finished_analysis - start
    plotting_time = finished_plotting - finished_analysis
    print(f"Time in analysis: {analysis_time}")
    print(f"Time in plotting: {plotting_time}")

The function perf_counter() uses the system clock to make a timestamp. These timestamps are based on counting the cycles used by the CPU. You can create timestamps around the code you’re evaluating, and subtract successive timestamps to get elapsed time.

Here’s the relevant output, which is made easier using the flags we just created:

$ python process_hx_data.py --use-cached-data --no-static-plots
...
Time in analysis: 0.0006291280000001453
Time in plotting: 0.2695389209999999
...
Time in analysis: 0.0013731260000002798
Time in plotting: 0.26619306499999995
...

From these results, we can see that plot_data() is only spending about a thousandth of a second on analysis work. Even if we clean up that work and move it elsewhere, the actual plotting work is what takes so much time. That’s no surprise. I really don’t think we’re going to be able to speed up this time much, without digging into the library code, which I have no interest in doing!

Let’s also see how this plays out in the static plots, which does a little more prep work in calculating critical regions.

I won’t show the code for this, but it’s structured just like the way we used perf_counter() in plot_data():

$ python process_hx_data.py --use-cached-data --no-interactive-plots
...
Time in analysis: 0.0007580660000001682
Time in plotting: 0.22037538899999998
...
Time in analysis: 0.0020618690000002715
Time in plotting: 0.23038119099999976

The analysis for the static plots takes a little longer than for the interactive, but not enough that we can significantly speed things up through optimization. We should certainly clean things up for readability purposes, but we should know ahead of time that we won’t gain much performance by doing so. Once again, the majority of the time is spent in the plotting library code.

Note: The commit hash here is ced67a1.

Back to strptime()

I took another look at what’s still taking a lot of time in the cProfile results, and I noticed this line again:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
140583    0.088    0.000    2.070    0.000 {built-in method strptime}

I started wondering if there was anything I could do about this because 2.07 seconds seems like a lot of time for simply processing strings. I realized there are a number of ways to convert string-based timestamps to datetime objects. This investigation led to a separate post, What’s faster than strptime()? The function fromisoformat() is not as flexible as strptime(), but it’s much faster when it does work.

In plot_heights.py I changed this line:

dt = datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M:%S')

to this:

dt = datetime.datetime.fromisoformat(datetime_str)

and this line:

dt_ak = datetime.datetime.strptime(datetime_str, '%Y-%m-%d %H:%M')

to this:

dt_ak = datetime.datetime.fromisoformat(datetime_str)

Here’s the relevant line from running cProfile again:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
140576    0.022    0.000    0.022    0.000 {built-in method fromisoformat}

We’re still making one call per data point, but the cumulative time has gone from two seconds to two hundredths of a second! This is while profiling, which includes some time that cProfile spends tracking the function.

Running the full program using only time saves over a second:

$ time python process_hx_data.py
...
real  0m7.406s
user  0m8.291s
sys   0m0.472s

Our previous best time for the full program was about 8.8 seconds; now we’re under 7.5 seconds.

Note: The commit hash here is 03667e5.

Other investigations

I looked at a few other things to see if I could gain any more efficiency.

Summary and Conclusions

Without profiling, this program took about 15.7 seconds before we started making any improvements. There was no way to only run part of the program, so every time I ran this file it took about 15 or 16 seconds. The final version runs in about 7.4 seconds, less than half the original execution time. And we don’t have to run the entire program; if we only want to regenerate the static plot files, from cached data, we can run the program in just over 3 seconds. This is how I’ll most often use this program, so we’ve effectively gone from about 15 or 16 seconds to about 3 or 4 seconds. That is a huge and really worthwhile improvement!

I didn’t just optimize this project. I also learned a good deal about using profile output. It really does pay to focus on the calls that take a long time, including calls involving library code. Some of these longer calls we’ll accept, because we don’t want to switch libraries. But some of these library calls are made out of convenience in the early stages of a project, and can be switched for more efficient code as a project evolves. Also, the focus on profile output led me toward cleaner code overall. After this first round of optimizations I accomplished a lot of refactoring, most of which had significant impacts on the overall efficienty of the project.

I also learned more about how early decisions regarding which data structures to use can affect efficiency as a project grows and evolves. It’s not always easy or possible to know how much an exploratory project will grow in the future. When I started this project, I had not idea if the results would be meaningful, so it made sense to not overthink things originally. But in future projects I’ll have a better sense of when to start optimizing, and for some projects I’ll take more time to choose more complex but efficient data structures from the start.

One thing I’ll certainly try to do a better job of in newer projects is keeping functions from growing too large. I tell students and readers their functions should do one task; I should really do a better job of recognizing when I should refactor to make that happen, even in the exploratory phase of a project. Long functions aren’t that hard to deal with, but functions that do many things become hard to work with as a project grows. I still have some work to do in that regard for this project. That probably won’t speed anything up much, but it will definitely make the project easier to work with.

Feedback

I hope you’ve found this helpful, and if you have any feedback please feel free to get in touch. I will correct any mistakes or misunderstandings on my part that are pointed out.