Source code for milton_maps.process_crash_data

import logging
from pathlib import Path

import geopandas as gpd
import pandas as pd
from shapely.geometry import Point

logger = logging.getLogger("process_crash_data")

INJURY_MAP = {
    "No injury": "No Injury",
    "Non-fatal injury - Possible": "Minor Injury",
    "Non-fatal injury - Non-incapacitating": "Minor Injury",
    "Non-fatal injury - Incapacitating": "Major Injury",
    "Not reported": "Unknown",
    "Fatal injury (K)": "Fatal Injury",
    "Unknown": "Unknown",
    "Not Applicable": "No Injury",
    "Deceased not caused by crash": "No Injury",
    "No Apparent Injury (O)": "No Injury",
    "Suspected Minor Injury (B)": "Minor Injury",
    "Possible Injury (C)": "Minor Injury",
    "Suspected Serious Injury (A)": "Major Injury",
}

# define project_root Path object
ROOT_DIR = Path(__file__).parent.parent


[docs]def get_milton_boundaries(): town_boundaries = gpd.read_file( ROOT_DIR / "data/processed/town_boundaries.shp.zip" ).set_index("TOWN_ID") milton_boundaries = town_boundaries[town_boundaries.TOWN.isin(["MILTON"])] return milton_boundaries
[docs]def get_crash_data(milton_boundaries=None) -> gpd.GeoDataFrame: if not milton_boundaries: milton_boundaries = get_milton_boundaries() # Read in crash data, which is in a CSV file with CRLF line endings, skipping 2 (??) rows. crash_data = pd.read_csv( ROOT_DIR / "data/raw/MiltonCrashDetails.csv", skiprows=2, dtype={ "Maximum_Injury_Severity_Reported": "category", "Crash_Severity": "category", "At_Roadway_Intersection": "category", }, ) crash_data.shape # combine text Crash_Date and Crash_Time fields into a single datetime field, assuming EST timezone. crash_data["Crash_DateTime"] = pd.to_datetime( crash_data["Crash_Date"] + " " + crash_data["Crash_Time"], utc=True ).dt.tz_convert("EST") crash_data["year"] = crash_data["Crash_DateTime"].dt.year crash_data["severity"] = ( crash_data["Maximum_Injury_Severity_Reported"].fillna("Unknown").map(INJURY_MAP) ) """Data transformation notes: - [x] Need to parse Crash Date + Crash Time into a datetime field. - [ ] Transform Crash Severity into a categorical variable. - [ ] Transform Manner of Colission into a categorical variable. """ logger.info( f"Found {crash_data[crash_data['X_Cooordinate'].isnull() | crash_data['Y_Cooordinate'].isnull()].shape[0]} records missing coordinates, and will be dropped" ) crash_data = crash_data.dropna(subset=["X_Cooordinate", "Y_Cooordinate"]) geometry = crash_data.apply( lambda row: Point(row["X_Cooordinate"], row["Y_Cooordinate"]), axis=1 ) crash_geodf = gpd.GeoDataFrame(data=crash_data, geometry=geometry, crs="EPSG:26986") # apply `milton_boundaries` as a mask to the crash_geodf crash_geodf2 = gpd.clip(crash_geodf, milton_boundaries) # determine how many records were dropped by the geoclip logger.info( f"Found {crash_geodf.shape[0] - crash_geodf2.shape[0]} records outside Milton were dropped by the geoclip." ) return crash_geodf2
[docs]def get_randolph_ave_shape(): massdot_roads = gpd.read_file(ROOT_DIR / "data/raw/MassDOT_Roads_SHP.zip") randolph_ave = massdot_roads.loc[ (massdot_roads.RT_NUMBER == "28") & (massdot_roads.STREET_NAM.str.lower().str.contains("randolph")) ] return randolph_ave
[docs]def randolph_ave_upstream_vs_intersection(crash_geodf=None, randolph_ave=None): if randolph_ave is None: randolph_ave = get_randolph_ave_shape() if crash_geodf is None: crash_geodf = get_crash_data() # Filter crash points to those within 20 meters of randolph avenue line randolph_ave_buffer = randolph_ave.buffer(20) # Crash points inside the randolph_ave_buffer randolph_ave_crashes = gpd.clip(crash_geodf, randolph_ave_buffer) # Create a polygon buffer of 30 meters around the latitude longitude (42.224225, -71.070639), which is the Chickatawbutt/Randolph intersection. # in the same CRS as the crash data. chickatawbut_randolph_intersection = ( gpd.GeoSeries( Point(-71.070639, 42.224225), crs="EPSG:4326" # Latitude/Longitude CRS ) .to_crs("EPSG:26986") # This CRS measures units in meters .buffer(20) ) intersection_crashes = gpd.clip(crash_geodf, chickatawbut_randolph_intersection) logger.info(f"Intersection crashes shape: {intersection_crashes.shape}") upstream_crashes = randolph_ave_crashes.loc[ ~randolph_ave_crashes.index.isin(intersection_crashes.index), : ] logger.info(f"Upstream crashes shape: {upstream_crashes.shape}") intersection_crashes["where"] = "intersection" upstream_crashes["where"] = "upstream" combined_crashes = gpd.GeoDataFrame( pd.concat([intersection_crashes, upstream_crashes], ignore_index=True) ) return intersection_crashes, upstream_crashes, combined_crashes