import os
import sys
import glob
import re
import multiprocess as mp2
import multiprocessing as mp
import argparse
import logging
import pandas as pd
from edisgo import EDisGo
from edisgo.network.results import Results
from edisgo.flex_opt.exceptions import MaximumIterationError
[docs]def setup_logging(
logfilename=None,
logfile_loglevel="debug",
console_loglevel="info",
**logging_kwargs
):
# a dict to help with log level definition
loglevel_dict = {
"info": logging.INFO,
"debug": logging.DEBUG,
"warn": logging.WARNING,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
if not (logfilename):
logfilename = "edisgo_run.log"
logging.basicConfig(
filename=logfilename,
format="%(asctime)s - %(name)s -" + " %(levelname)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
level=loglevel_dict[logfile_loglevel],
)
root_logger = logging.getLogger()
console_stream = logging.StreamHandler()
console_stream.setLevel(loglevel_dict[console_loglevel])
console_formatter = logging.Formatter(
fmt="%(asctime)s - %(name)s -" + " %(levelname)s - %(message)s",
datefmt="%m/%d/%Y %H:%M:%S",
)
console_stream.setFormatter(console_formatter)
# add stream handler to root logger
root_logger.addHandler(console_stream)
return root_logger
def _get_griddistrict(ding0_filepath):
"""
Just get the network district number from ding0 data file path
Parameters
----------
ding0_filepath : str
Path to ding0 data ending typically
`/path/to/ding0_data/"ding0_grids__" + str(``grid_district``) + ".xxx"`
Returns
-------
int
grid_district number
"""
grid_district = os.path.basename(ding0_filepath)
grid_district_search = re.search("[_]+\d+", grid_district)
if grid_district_search:
grid_district = int(grid_district_search.group(0)[2:])
return grid_district
else:
raise (KeyError("Grid District not found in ".format(grid_district)))
[docs]def run_edisgo_basic(
ding0_filepath,
generator_scenario=None,
analysis="worst-case",
*edisgo_grid
):
"""
Analyze edisgo network extension cost as reference scenario
ToDo: adapt to refactored code!
Parameters
----------
ding0_filepath : str
Path to ding0 data ending typically
`/path/to/ding0_data/"ding0_grids__" + str(``grid_district``) + ".xxx"`
analysis : str
Either 'worst-case' or 'timeseries'
generator_scenario : None or :obj:`str`
If provided defines which scenario of future generator park to use
and invokes import of these generators. Possible options are 'nep2035'
and 'ego100'.
Returns
-------
edisgo_grid : :class:`~.network.network.EDisGo`
eDisGo network container
costs : :pandas:`pandas.Dataframe<DataFrame>`
Cost of network extension
grid_issues : dict
Grids resulting in an error including error message
"""
raise NotImplementedError
grid_district = _get_griddistrict(ding0_filepath)
grid_issues = {}
logging.info(
"Grid expansion for MV network district {}".format(grid_district)
)
if (
edisgo_grid
): # if an edisgo_grid is passed in arg then ignore everything else
edisgo_grid = edisgo_grid[0]
else:
try:
if "worst-case" in analysis:
edisgo_grid = EDisGo(
ding0_grid=ding0_filepath, worst_case_analysis=analysis
)
elif "timeseries" in analysis:
edisgo_grid = EDisGo(
ding0_grid=ding0_filepath,
timeseries_generation_fluctuating="oedb",
timeseries_load="demandlib",
)
except FileNotFoundError as e:
return (
None,
pd.DataFrame(),
{"network": grid_district, "msg": str(e)},
)
# Import generators
if generator_scenario:
logging.info(
"Grid expansion for scenario '{}'.".format(generator_scenario)
)
edisgo_grid.import_generators(generator_scenario=generator_scenario)
else:
logging.info(
"Grid expansion with no generator imports based on scenario"
)
try:
# Do network reinforcement
edisgo_grid.reinforce()
# Get costs
costs_grouped = edisgo_grid.network.results.grid_expansion_costs.groupby(
["type"]
).sum()
costs = pd.DataFrame(
costs_grouped.values,
columns=costs_grouped.columns,
index=[
[edisgo_grid.network.id] * len(costs_grouped),
costs_grouped.index,
],
).reset_index()
costs.rename(columns={"level_0": "network"}, inplace=True)
grid_issues["network"] = None
grid_issues["msg"] = None
logging.info("SUCCESS!")
except MaximumIterationError:
grid_issues["network"] = edisgo_grid.network.id
grid_issues["msg"] = str(edisgo_grid.network.results.unresolved_issues)
costs = pd.DataFrame()
logging.warning("Unresolved issues left after network expansion.")
except Exception as e:
grid_issues["network"] = edisgo_grid.network.id
grid_issues["msg"] = repr(e)
costs = pd.DataFrame()
logging.exception()
return edisgo_grid, costs, grid_issues
[docs]def run_edisgo_twice(run_args):
"""
Run network analysis twice on same network: once w/ and once w/o new generators
ToDo: adapt to refactored code!
First run without connection of new generators approves sufficient network
hosting capacity. Otherwise, network is reinforced.
Second run assessment network extension needs in terms of RES integration
Parameters
----------
run_args : list
Optional parameters for :func:`run_edisgo_basic`.
Returns
-------
all_costs_before_geno_import : :pandas:`pandas.Dataframe<DataFrame>`
Grid extension cost before network connection of new generators
all_grid_issues_before_geno_import : dict
Remaining overloading or over-voltage issues in network
all_costs : :pandas:`pandas.Dataframe<DataFrame>`
Grid extension cost due to network connection of new generators
all_grid_issues : dict
Remaining overloading or over-voltage issues in network
"""
raise NotImplementedError
# base case with no generator import
(
edisgo_grid,
costs_before_geno_import,
grid_issues_before_geno_import,
) = run_edisgo_basic(*run_args)
if edisgo_grid:
# clear the pypsa object and results from edisgo_grid
edisgo_grid.network.results = Results(edisgo_grid.network)
edisgo_grid.network.pypsa = None
# case after generator import
# run_args = [ding0_filename]
# run_args.extend(run_args_opt)
run_args.append(edisgo_grid)
_, costs, grid_issues = run_edisgo_basic(*run_args)
return (
costs_before_geno_import,
grid_issues_before_geno_import,
costs,
grid_issues,
)
else:
return (
costs_before_geno_import,
grid_issues_before_geno_import,
costs_before_geno_import,
grid_issues_before_geno_import,
)
[docs]def run_edisgo_pool(
ding0_file_list,
run_args_opt=[None, "worst-case"],
workers=mp.cpu_count(),
worker_lifetime=1,
):
"""
Use python multiprocessing toolbox for parallelization
Several grids are analyzed in parallel.
Parameters
----------
ding0_file_list : list
Ding0 network data file names
run_args_opt : list
eDisGo options, see :func:`run_edisgo_basic` and
:func:`run_edisgo_twice`, has to contain generator_scenario and analysis as entries
workers: int
Number of parallel process
worker_lifetime : int
Bunch of grids sequentially analyzed by a worker
Returns
-------
all_costs_before_geno_import : list
Grid extension cost before network connection of new generators
all_grid_issues_before_geno_import : list
Remaining overloading or over-voltage issues in network
all_costs : list
Grid extension cost due to network connection of new generators
all_grid_issues : list
Remaining overloading or over-voltage issues in network
"""
def collect_pool_results(result):
results.append(result)
results = []
pool = mp.Pool(workers, maxtasksperchild=worker_lifetime)
for file in ding0_file_list:
edisgo_args = [file] + run_args_opt
pool.apply_async(
func=run_edisgo_twice,
args=(edisgo_args,),
callback=collect_pool_results,
)
pool.close()
pool.join()
# process results data
all_costs_before_geno_import = [r[0] for r in results]
all_grid_issues_before_geno_import = [r[1] for r in results]
all_costs = [r[2] for r in results]
all_grid_issues = [r[3] for r in results]
return (
all_costs_before_geno_import,
all_grid_issues_before_geno_import,
all_costs,
all_grid_issues,
)
[docs]def run_edisgo_pool_flexible(
ding0_id_list,
func,
func_arguments,
workers=mp2.cpu_count(),
worker_lifetime=1,
):
"""
Use python multiprocessing toolbox for parallelization
Several grids are analyzed in parallel based on your custom function that
defines the specific application of eDisGo.
Parameters
----------
ding0_id_list : list of int
List of ding0 network data IDs (also known as HV/MV substation IDs)
func : any function
Your custom function that shall be parallelized
func_arguments : tuple
Arguments to custom function ``func``
workers: int
Number of parallel process
worker_lifetime : int
Bunch of grids sequentially analyzed by a worker
Notes
-----
Please note, the following requirements for the custom function which is to
be executed in parallel
#. It must return an instance of the type :class:`~.edisgo.EDisGo`.
#. The first positional argument is the MV network district id (as int). It is
prepended to the tuple of arguments ``func_arguments``
Returns
-------
containers : dict of :class:`~.edisgo.EDisGo`
Dict of EDisGo instances keyed by its ID
"""
def collect_pool_results(result):
"""
Store results from parallelized calculation in structured manner
Parameters
----------
result: :class:`~.edisgo.EDisGo`
"""
results.update({result.network.id: result})
results = {}
pool = mp2.Pool(workers, maxtasksperchild=worker_lifetime)
def error_callback(key):
return lambda o: results.update({key: o})
for ding0_id in ding0_id_list:
edisgo_args = (ding0_id, *func_arguments)
pool.apply_async(
func=func,
args=edisgo_args,
callback=collect_pool_results,
error_callback=error_callback(ding0_id),
)
pool.close()
pool.join()
return results
[docs]def edisgo_run():
# create the argument parser
example_text = """Examples
...assumes all files located in PWD.
Analyze a single network in 'worst-case'
edisgo_run -f ding0_grids__997.pkl -wc
Analyze multiple grids in 'worst-case' using parallelization. Grid IDs are
specified by the grids_list.txt.
edisgo_run -ds '' grids_list.txt ding0_grids__{}.pkl -wc --parallel
"""
parser = argparse.ArgumentParser(
description="Commandline running" + "of eDisGo",
epilog=example_text,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
# add the verbosity arguments
ding0_files_parsegroup = parser.add_mutually_exclusive_group(required=True)
ding0_files_parsegroup.add_argument(
"-f",
"--ding0-file-path",
type=str,
action="store",
dest="ding0_filename",
help="Path to a single ding0 file.",
)
ding0_files_parsegroup.add_argument(
"-d",
"--ding0-files-directory",
type=str,
action="store",
dest="ding0_dirglob",
help="Path to a directory of ding0 files "
+ "along with a file name pattern for glob input.",
)
ding0_files_parsegroup.add_argument(
"-ds",
"--ding0-files-directory-selection",
type=str,
nargs=3,
action="store",
dest="ding0_dir_select",
help="Path to a directory of ding0 files, "
+ "Path to file with list of network district numbers "
+ "(one number per line), "
+ "and file name template using {} where number "
+ "is to be inserted . Convention is to use "
+ "a double underscore before network district number "
+ " like so '__{}'.",
)
analysis_parsegroup = parser.add_mutually_exclusive_group()
analysis_parsegroup.add_argument(
"-wc",
"--worst-case",
action="store_true",
help="Performs a worst-case simulation with " + "a single snapshot",
)
analysis_parsegroup.add_argument(
"-ts",
"--timeseries",
action="store_true",
help="Performs a worst-case simulation with " + "a time-series",
)
parser.add_argument(
"-s",
"--scenario",
type=str,
default=None,
choices=[None, "nep2035", "ego100"],
help="'None' or 'string'\n"
+ "If provided defines which scenario "
+ "of future generator park to use "
+ "and invokes import of these generators.\n"
+ "Possible options are 'nep2035'and 'ego100'.",
)
parser.add_argument(
"-o",
"--output-dir",
nargs="?",
metavar="/path/to/output/",
dest="out_dir",
type=str,
default=os.path.join(sys.path[0]),
help="Absolute path to results data location.",
)
parser.add_argument(
"-p",
"--parallel",
action="store_true",
help="Parallel execution of multiple "
"grids. Parallelization is provided "
"by multiprocessing.",
)
parser.add_argument(
"-w",
"--workers",
nargs="?",
metavar="1..inf",
dest="workers",
type=int,
default=mp.cpu_count(),
help="Number of workers in parallel. In other words, "
"cores that are used for parallelization.",
)
parser.add_argument(
"-lw",
"--lifetime-workers",
nargs="?",
metavar="1..inf",
dest="worker_lifetime",
type=int,
default=None,
help="Lifetime of a worker of the cluster doing the "
"work. The lifetime is given is number of jobs a"
" worker does before it is replaced by a freshly "
"new one."
"The default sets the lifetime to the pools "
"lifetime. This can cause memory issues!",
)
args = parser.parse_args(sys.argv[1:])
# get current time for output file names
exec_time = pd.datetime.now().strftime("%Y-%m-%d_%H%M")
logger = setup_logging(
logfilename="test.log",
logfile_loglevel="debug",
console_loglevel="info",
)
# get the list of files to run on
if args.ding0_filename:
ding0_file_list = [args.ding0_filename]
elif args.ding0_dirglob:
ding0_file_list = glob.glob(args.ding0_dirglob)
elif args.ding0_dir_select:
with open(args.ding0_dir_select[1], "r") as file_handle:
ding0_file_list_grid_district_numbers = list(file_handle)
ding0_file_list_grid_district_numbers = [
_.splitlines()[0]
for _ in ding0_file_list_grid_district_numbers
]
ding0_file_list = map(
lambda x: args.ding0_dir_select[0]
+ args.ding0_dir_select[2].format(x),
ding0_file_list_grid_district_numbers,
)
else:
raise FileNotFoundError(
"Some of the Arguments for input files are missing."
)
# this is the serial version of the run system
run_func = run_edisgo_basic
run_args_opt_no_scenario = [None]
run_args_opt = [args.scenario]
if args.worst_case:
run_args_opt_no_scenario.append("worst-case")
run_args_opt.append("worst-case")
elif args.timeseries:
run_args_opt_no_scenario.append("timeseries")
run_args_opt.append("timeseries")
all_costs_before_geno_import = []
all_grid_issues_before_geno_import = {"network": [], "msg": []}
all_costs = []
all_grid_issues = {"network": [], "msg": []}
if not args.parallel:
for ding0_filename in ding0_file_list:
grid_district = _get_griddistrict(ding0_filename)
run_args = [ding0_filename]
run_args.extend(run_args_opt_no_scenario)
(
costs_before_geno_import,
grid_issues_before_geno_import,
costs,
grid_issues,
) = run_edisgo_twice(run_args)
all_costs_before_geno_import.append(costs_before_geno_import)
all_grid_issues_before_geno_import["network"].append(
grid_issues_before_geno_import["network"]
)
all_grid_issues_before_geno_import["msg"].append(
grid_issues_before_geno_import["msg"]
)
all_costs.append(costs)
all_grid_issues["network"].append(grid_issues["network"])
all_grid_issues["msg"].append(grid_issues["msg"])
else:
(
all_costs_before_geno_import,
all_grid_issues_before_geno_import,
all_costs,
all_grid_issues,
) = run_edisgo_pool(
ding0_file_list,
run_args_opt_no_scenario,
args.workers,
args.worker_lifetime,
)
# consolidate costs for all the networks
all_costs_before_geno_import = pd.concat(
all_costs_before_geno_import, ignore_index=True
)
all_costs = pd.concat(all_costs, ignore_index=True)
# write costs and error messages to csv files
pd.DataFrame(all_grid_issues_before_geno_import).dropna(
axis=0, how="all"
).to_csv(
args.out_dir + exec_time + "_" + "grid_issues_before_geno_import.csv",
index=False,
)
with open(
args.out_dir + exec_time + "_" + "costs_before_geno_import.csv", "a"
) as f:
f.write(",,,# units: length in km,, total_costs in kEUR\n")
all_costs_before_geno_import.to_csv(f, index=False)
pd.DataFrame(all_grid_issues).dropna(axis=0, how="all").to_csv(
args.out_dir + exec_time + "_" + "grid_issues.csv", index=False
)
with open(args.out_dir + exec_time + "_" + "costs.csv", "a") as f:
f.write(",,,# units: length in km,, total_costs in kEUR\n")
all_costs.to_csv(f, index=False)
if __name__ == "__main__":
pass