def process(args: argparse.Namespace | None = None) -> None: # noqa: C901
"""
Find and process all files.
Parameters
----------
args : None
Arguments.
"""
config, img_files = _parse_configuration(args)
processing_function = partial(
process_scan,
base_dir=config["base_dir"],
filter_config=config["filter"],
grains_config=config["grains"],
grainstats_config=config["grainstats"],
disordered_tracing_config=config["disordered_tracing"],
nodestats_config=config["nodestats"],
ordered_tracing_config=config["ordered_tracing"],
splining_config=config["splining"],
curvature_config=config["curvature"],
plotting_config=config["plotting"],
output_dir=config["output_dir"],
)
# Ensure we load the original images as we are running the whole pipeline
if config["file_ext"] == ".topostats":
config["loading"]["extract"] = "raw"
all_scan_data = LoadScans(img_files, **config["loading"])
all_scan_data.get_data()
# Get a dictionary of all the image data dictionaries.
# Keys are the image names
# Values are the individual image data dictionaries
scan_data_dict = all_scan_data.img_dict
with Pool(processes=config["cores"]) as pool:
results = defaultdict()
image_stats_all = defaultdict()
mols_results = defaultdict()
disordered_trace_results = defaultdict()
height_profile_all = defaultdict()
with tqdm(
total=len(img_files),
desc=f"Processing images from {config['base_dir']}, results are under {config['output_dir']}",
) as pbar:
for (
img,
result,
height_profiles,
individual_image_stats_df,
disordered_trace_result,
mols_result,
) in pool.imap_unordered(
processing_function,
scan_data_dict.values(),
):
results[str(img)] = result.dropna(axis=1, how="all")
disordered_trace_results[str(img)] = disordered_trace_result.dropna(axis=1, how="all")
mols_results[str(img)] = mols_result.dropna(axis=1, how="all")
pbar.update()
# Add the dataframe to the results dict
image_stats_all[str(img)] = individual_image_stats_df.dropna(axis=1, how="all")
# Combine all height profiles
height_profile_all[str(img)] = height_profiles
# Display completion message for the image
LOGGER.info(f"[{img.name}] Processing completed.")
LOGGER.info(f"Saving image stats to : {config['output_dir']}/image_stats.csv.")
# Concatenate all the dictionary's values into a dataframe. Ignore the keys since
# the dataframes have the file names in them already.
image_stats_all_df = pd.concat(image_stats_all.values())
image_stats_all_df.to_csv(config["output_dir"] / "image_stats.csv")
try:
results = pd.concat(results.values())
except ValueError as error:
LOGGER.error("No grains found in any images, consider adjusting your thresholds.")
LOGGER.error(error)
try:
disordered_trace_results = pd.concat(disordered_trace_results.values())
except ValueError as error:
LOGGER.error("No skeletons found in any images, consider adjusting disordered tracing parameters.")
LOGGER.error(error)
try:
mols_results = pd.concat(mols_results.values())
except ValueError as error:
LOGGER.error("No mols found in any images, consider adjusting ordered tracing / splining parameters.")
LOGGER.error(error)
# If requested save height profiles
if config["grainstats"]["extract_height_profile"]:
LOGGER.info(f"Saving all height profiles to {config['output_dir']}/height_profiles.json")
dict_to_json(data=height_profile_all, output_dir=config["output_dir"], filename="height_profiles.json")
# Summary Statistics and Plots
if config["summary_stats"]["run"]:
# Load summary plots/statistics configuration and validate, location depends on command line args or value in
# any config file given, if neither are provided the default topostats/summary_config.yaml is loaded
if args.summary_config is not None:
summary_config = read_yaml(args.summary_config)
elif config["summary_stats"]["config"] is not None:
summary_config = read_yaml(config["summary_stats"]["config"])
else:
summary_yaml = (resources.files(__package__) / "summary_config.yaml").read_text()
summary_config = yaml.safe_load(summary_yaml)
# Do not pass command line arguments to toposum as they clash with process command line arguments
summary_config = update_config(summary_config, config["plotting"])
validate_config(summary_config, SUMMARY_SCHEMA, config_type="YAML summarisation config")
# We never want to load data from CSV as we are using the data that has just been processed.
summary_config.pop("csv_file")
# Load variable to label mapping
plotting_yaml = (resources.files(__package__) / "var_to_label.yaml").read_text()
summary_config["var_to_label"] = yaml.safe_load(plotting_yaml)
LOGGER.info("[plotting] Default variable to labels mapping loaded.")
# If we don't have a dataframe or we do and it is all NaN there is nothing to plot
if isinstance(results, pd.DataFrame) and not results.isna().values.all():
if results.shape[0] > 1:
# If summary_config["output_dir"] does not match or is not a sub-dir of config["output_dir"] it
# needs creating
summary_config["output_dir"] = config["output_dir"] / "summary_distributions"
summary_config["output_dir"].mkdir(parents=True, exist_ok=True)
LOGGER.info(f"Summary plots and statistics will be saved to : {summary_config['output_dir']}")
# Plot summaries
summary_config["df"] = results.reset_index()
toposum(summary_config)
else:
LOGGER.warning(
"There are fewer than two grains that have been detected, so"
" summary plots cannot be made for this image."
)
else:
LOGGER.warning(
"There are no results to plot, either...\n\n"
"* you have disabled grains/grainstats etc.\n"
"* no grains have been detected across all scans.\n"
"* there have been errors.\n\n"
"If you are not expecting to detect grains please consider disabling"
"grains/grainstats etc/plotting/summary_stats. If you are expecting to detect grains"
" please check log-files for further information."
)
else:
summary_config = None
# Write statistics to CSV if there is data.
if isinstance(results, pd.DataFrame) and not results.isna().values.all():
results.reset_index(drop=True, inplace=True)
results.set_index(["image", "threshold", "grain_number"], inplace=True)
results.to_csv(config["output_dir"] / "all_statistics.csv", index=True)
save_folder_grainstats(config["output_dir"], config["base_dir"], results, "grain_stats")
results.reset_index(inplace=True) # So we can access unique image names
images_processed = len(results["image"].unique())
else:
images_processed = 0
LOGGER.warning("There are no grainstats statistics to write to CSV.")
if isinstance(disordered_trace_results, pd.DataFrame) and not disordered_trace_results.isna().values.all():
disordered_trace_results.reset_index(inplace=True)
disordered_trace_results.set_index(["image", "threshold", "grain_number"], inplace=True)
disordered_trace_results.to_csv(config["output_dir"] / "all_disordered_segment_statistics.csv", index=True)
save_folder_grainstats(
config["output_dir"], config["base_dir"], disordered_trace_results, "disordered_trace_stats"
)
disordered_trace_results.reset_index(inplace=True) # So we can access unique image names
else:
LOGGER.warning("There are no disordered tracing statistics to write to CSV.")
if isinstance(mols_results, pd.DataFrame) and not mols_results.isna().values.all():
mols_results.reset_index(drop=True, inplace=True)
mols_results.set_index(["image", "threshold", "grain_number"], inplace=True)
mols_results.to_csv(config["output_dir"] / "all_mol_statistics.csv", index=True)
save_folder_grainstats(config["output_dir"], config["base_dir"], mols_results, "mol_stats")
mols_results.reset_index(inplace=True) # So we can access unique image names
else:
LOGGER.warning("There are no molecule tracing statistics to write to CSV.")
# Write config to file
config["plotting"].pop("plot_dict")
write_yaml(config, output_dir=config["output_dir"])
LOGGER.debug(f"Images processed : {images_processed}")
# Update config with plotting defaults for printing
completion_message(config, img_files, summary_config, images_processed)