from __future__ import annotations
import logging
import os
from math import pi, sqrt
import networkx as nx
import numpy as np
import pandas as pd
from sqlalchemy import func
from edisgo.flex_opt import check_tech_constraints, exceptions
from edisgo.tools import session_scope
if "READTHEDOCS" not in os.environ:
import geopandas as gpd
from egoio.db_tables import climate
from shapely.geometry.multipolygon import MultiPolygon
from shapely.wkt import loads as wkt_loads
logger = logging.getLogger(__name__)
[docs]def select_worstcase_snapshots(edisgo_obj):
"""
Select two worst-case snapshots from time series
Two time steps in a time series represent worst-case snapshots. These are
1. Maximum Residual Load: refers to the point in the time series where the
(load - generation) achieves its maximum.
2. Minimum Residual Load: refers to the point in the time series where the
(load - generation) achieves its minimum.
These two points are identified based on the generation and load time
series. In case load or feed-in case don't exist None is returned.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
Returns
-------
:obj:`dict`
Dictionary with keys 'min_residual_load' and 'max_residual_load'.
Values are corresponding worst-case snapshots of type
:pandas:`pandas.Timestamp<Timestamp>`.
"""
residual_load = edisgo_obj.timeseries.residual_load
timestamp = {
"min_residual_load": residual_load.idxmin(),
"max_residual_load": residual_load.idxmax(),
}
return timestamp
[docs]def calculate_relative_line_load(edisgo_obj, lines=None, timesteps=None):
"""
Calculates relative line loading for specified lines and time steps.
Line loading is calculated by dividing the current at the given time step
by the allowed current.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
lines : list(str) or None, optional
Line names/representatives of lines to calculate line loading for. If
None, line loading is calculated for all lines in the network.
Default: None.
timesteps : :pandas:`pandas.Timestamp<Timestamp>` or \
list(:pandas:`pandas.Timestamp<Timestamp>`) or None, optional
Specifies time steps to calculate line loading for. If timesteps is
None, all time steps power flow analysis was conducted for are used.
Default: None.
Returns
--------
:pandas:`pandas.DataFrame<DataFrame>`
Dataframe with relative line loading (unitless). Index of
the dataframe is a :pandas:`pandas.DatetimeIndex<DatetimeIndex>`,
columns are the line representatives.
"""
if timesteps is None:
timesteps = edisgo_obj.results.i_res.index
# check if timesteps is array-like, otherwise convert to list
if not hasattr(timesteps, "__len__"):
timesteps = [timesteps]
if lines is not None:
line_indices = lines
else:
line_indices = edisgo_obj.topology.lines_df.index
mv_lines_allowed_load = check_tech_constraints.lines_allowed_load(edisgo_obj, "mv")
lv_lines_allowed_load = check_tech_constraints.lines_allowed_load(edisgo_obj, "lv")
lines_allowed_load = pd.concat(
[mv_lines_allowed_load, lv_lines_allowed_load], axis=1, sort=False
).loc[timesteps, line_indices]
return check_tech_constraints.lines_relative_load(edisgo_obj, lines_allowed_load)
[docs]def calculate_line_reactance(line_inductance_per_km, line_length, num_parallel):
"""
Calculates line reactance in Ohm.
Parameters
----------
line_inductance_per_km : float or array-like
Line inductance in mH/km.
line_length : float
Length of line in km.
num_parallel : int
Number of parallel lines.
Returns
-------
float
Reactance in Ohm
"""
return line_inductance_per_km / 1e3 * line_length * 2 * pi * 50 / num_parallel
[docs]def calculate_line_resistance(line_resistance_per_km, line_length, num_parallel):
"""
Calculates line resistance in Ohm.
Parameters
----------
line_resistance_per_km : float or array-like
Line resistance in Ohm/km.
line_length : float
Length of line in km.
num_parallel : int
Number of parallel lines.
Returns
-------
float
Resistance in Ohm
"""
return line_resistance_per_km * line_length / num_parallel
[docs]def calculate_line_susceptance(line_capacitance_per_km, line_length, num_parallel):
"""
Calculates line shunt susceptance in Siemens.
Parameters
----------
line_capacitance_per_km : float
Line capacitance in uF/km.
line_length : float
Length of line in km.
num_parallel : int
Number of parallel lines.
Returns
-------
float
Shunt susceptance in Siemens.
"""
return line_capacitance_per_km / 1e6 * line_length * 2 * pi * 50 * num_parallel
[docs]def calculate_apparent_power(nominal_voltage, current, num_parallel):
"""
Calculates apparent power in MVA from given voltage and current.
Parameters
----------
nominal_voltage : float or array-like
Nominal voltage in kV.
current : float or array-like
Current in kA.
num_parallel : int or array-like
Number of parallel lines.
Returns
-------
float
Apparent power in MVA.
"""
return sqrt(3) * nominal_voltage * current * num_parallel
[docs]def drop_duplicated_indices(dataframe, keep="first"):
"""
Drop rows of duplicate indices in dataframe.
Parameters
----------
dataframe::pandas:`pandas.DataFrame<DataFrame>`
handled dataframe
keep: str
indicator of row to be kept, 'first', 'last' or False,
see pandas.DataFrame.drop_duplicates() method
"""
return dataframe[~dataframe.index.duplicated(keep=keep)]
[docs]def drop_duplicated_columns(df, keep="first"):
"""
Drop columns of dataframe that appear more than once.
Parameters
----------
df : :pandas:`pandas.DataFrame<DataFrame>`
Dataframe of which columns are dropped.
keep : str
Indicator of whether to keep first ('first'), last ('last') or
none (False) of the duplicated columns.
See `drop_duplicates()` method of
:pandas:`pandas.DataFrame<DataFrame>`.
"""
return df.loc[:, ~df.columns.duplicated(keep=keep)]
[docs]def select_cable(edisgo_obj, level, apparent_power):
"""
Selects suitable cable type and quantity using given apparent power.
Cable is selected to be able to carry the given `apparent_power`, no load
factor is considered. Overhead lines are not considered in choosing a
suitable cable.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
level : str
Grid level to get suitable cable for. Possible options are 'mv' or
'lv'.
apparent_power : float
Apparent power the cable must carry in MVA.
Returns
-------
:pandas:`pandas.Series<Series>`
Series with attributes of selected cable as in equipment data and
cable type as series name.
int
Number of necessary parallel cables.
"""
cable_count = 1
if level == "mv":
cable_data = edisgo_obj.topology.equipment_data["mv_cables"]
available_cables = cable_data[
cable_data["U_n"] == edisgo_obj.topology.mv_grid.nominal_voltage
]
elif level == "lv":
available_cables = edisgo_obj.topology.equipment_data["lv_cables"]
else:
raise ValueError(
"Specified voltage level is not valid. Must either be 'mv' or 'lv'."
)
suitable_cables = available_cables[
calculate_apparent_power(
available_cables["U_n"], available_cables["I_max_th"], cable_count
)
> apparent_power
]
# increase cable count until appropriate cable type is found
while suitable_cables.empty and cable_count < 7:
cable_count += 1
suitable_cables = available_cables[
calculate_apparent_power(
available_cables["U_n"],
available_cables["I_max_th"],
cable_count,
)
> apparent_power
]
if suitable_cables.empty:
raise exceptions.MaximumIterationError(
"Could not find a suitable cable for apparent power of "
"{} MVA.".format(apparent_power)
)
cable_type = suitable_cables.loc[suitable_cables["I_max_th"].idxmin()]
return cable_type, cable_count
[docs]def assign_feeder(edisgo_obj, mode="mv_feeder"):
"""
Assigns MV or LV feeder to each bus and line, depending on the `mode`.
The feeder name is written to a new column `mv_feeder` or `lv_feeder`
in :class:`~.network.topology.Topology`'s
:attr:`~.network.topology.Topology.buses_df` and
:attr:`~.network.topology.Topology.lines_df`. The MV respectively LV feeder
name corresponds to the name of the first bus in the respective feeder.
Parameters
-----------
edisgo_obj : :class:`~.EDisGo`
mode : str
Specifies whether to assign MV or LV feeder. Valid options are
'mv_feeder' or 'lv_feeder'. Default: 'mv_feeder'.
"""
def _assign_to_busses(graph, station):
# get all buses in network and remove station to get separate subgraphs
graph_nodes = list(graph.nodes())
graph_nodes.remove(station)
subgraph = graph.subgraph(graph_nodes)
for neighbor in graph.neighbors(station):
# get all nodes in that feeder by doing a DFS in the disconnected
# subgraph starting from the node adjacent to the station
# `neighbor`
subgraph_neighbor = nx.dfs_tree(subgraph, source=neighbor)
for node in subgraph_neighbor.nodes():
edisgo_obj.topology.buses_df.at[node, mode] = neighbor
# in case of an LV station, assign feeder to all nodes in that
# LV network (only applies when mode is 'mv_feeder'
if node.split("_")[0] == "BusBar" and node.split("_")[-1] == "MV":
lvgrid = edisgo_obj.topology.get_lv_grid(int(node.split("_")[-2]))
edisgo_obj.topology.buses_df.loc[
lvgrid.buses_df.index, mode
] = neighbor
def _assign_to_lines(lines):
edisgo_obj.topology.lines_df.loc[
lines, mode
] = edisgo_obj.topology.lines_df.loc[lines].apply(
lambda _: edisgo_obj.topology.buses_df.at[_.bus0, mode], axis=1
)
tmp = edisgo_obj.topology.lines_df.loc[lines]
lines_nan = tmp[tmp.loc[lines, mode].isna()].index
edisgo_obj.topology.lines_df.loc[
lines_nan, mode
] = edisgo_obj.topology.lines_df.loc[lines_nan].apply(
lambda _: edisgo_obj.topology.buses_df.at[_.bus1, mode], axis=1
)
if mode == "mv_feeder":
graph = edisgo_obj.topology.mv_grid.graph
station = edisgo_obj.topology.mv_grid.station.index[0]
_assign_to_busses(graph, station)
lines = edisgo_obj.topology.lines_df.index
_assign_to_lines(lines)
elif mode == "lv_feeder":
for lv_grid in edisgo_obj.topology.mv_grid.lv_grids:
graph = lv_grid.graph
station = lv_grid.station.index[0]
_assign_to_busses(graph, station)
lines = lv_grid.lines_df.index
_assign_to_lines(lines)
else:
raise ValueError(
"Invalid mode. Mode must either be 'mv_feeder' or 'lv_feeder'."
)
[docs]def get_path_length_to_station(edisgo_obj):
"""
Determines path length from each bus to HV-MV station.
The path length is written to a new column `path_length_to_station` in
`buses_df` dataframe of :class:`~.network.topology.Topology` class.
Parameters
-----------
edisgo_obj : :class:`~.EDisGo`
Returns
-------
:pandas:`pandas.Series<Series>`
Series with bus name in index and path length to station as value.
"""
graph = edisgo_obj.topology.mv_grid.graph
mv_station = edisgo_obj.topology.mv_grid.station.index[0]
for bus in edisgo_obj.topology.mv_grid.buses_df.index:
path = nx.shortest_path(graph, source=mv_station, target=bus)
edisgo_obj.topology.buses_df.at[bus, "path_length_to_station"] = len(path) - 1
if bus.split("_")[0] == "BusBar" and bus.split("_")[-1] == "MV":
# check if there is an underlying LV grid
lv_grid_id = int(bus.split("_")[-2])
if lv_grid_id in edisgo_obj.topology._lv_grid_ids:
lvgrid = edisgo_obj.topology.get_lv_grid(lv_grid_id)
lv_graph = lvgrid.graph
lv_station = lvgrid.station.index[0]
for bus in lvgrid.buses_df.index:
lv_path = nx.shortest_path(lv_graph, source=lv_station, target=bus)
edisgo_obj.topology.buses_df.at[
bus, "path_length_to_station"
] = len(path) + len(lv_path)
return edisgo_obj.topology.buses_df.path_length_to_station
[docs]def get_downstream_buses(edisgo_obj, comp_name, comp_type="bus"):
"""
Returns all buses downstream (farther away from station) of the given bus or line.
In case a bus is given, returns all buses downstream of the given bus plus the
given bus itself.
In case a line is given, returns all buses downstream of the bus that is closer to
the station (thus only one bus of the line is included in the returned buses).
Parameters
------------
edisgo_obj : EDisGo object
comp_name : str
Name of bus or line (as in index of :attr:`~.network.topology.Topology.buses_df`
or :attr:`~.network.topology.Topology.lines_df`) to get downstream buses for.
comp_type : str
Can be either 'bus' or 'line'. Default: 'bus'.
Returns
-------
list(str)
List of buses (as in index of :attr:`~.network.topology.Topology.buses_df`)
downstream of the given component.
"""
graph = edisgo_obj.topology.to_graph()
station_node = edisgo_obj.topology.transformers_hvmv_df.bus1.values[0]
if comp_type == "bus":
# get upstream bus to determine which edge to remove to create subgraph
bus = comp_name
path_to_station = nx.shortest_path(graph, station_node, comp_name)
bus_upstream = path_to_station[-2]
elif comp_type == "line":
# get bus further downstream to determine which buses downstream are affected
bus0 = edisgo_obj.topology.lines_df.at[comp_name, "bus0"]
bus1 = edisgo_obj.topology.lines_df.at[comp_name, "bus1"]
path_to_station_bus0 = nx.shortest_path(graph, station_node, bus0)
path_to_station_bus1 = nx.shortest_path(graph, station_node, bus1)
bus = bus0 if len(path_to_station_bus0) > len(path_to_station_bus1) else bus1
bus_upstream = bus0 if bus == bus1 else bus1
else:
return None
# remove edge between bus and next bus upstream
graph.remove_edge(bus, bus_upstream)
# get subgraph containing relevant bus
subgraphs = list(graph.subgraph(c) for c in nx.connected_components(graph))
for subgraph in subgraphs:
if bus in subgraph.nodes():
return list(subgraph.nodes())
return None
[docs]def assign_voltage_level_to_component(df, buses_df):
"""
Adds column with specification of voltage level component is in.
The voltage level ('mv' or 'lv') is determined based on the nominal
voltage of the bus the component is connected to. If the nominal voltage
is smaller than 1 kV, voltage level 'lv' is assigned, otherwise 'mv' is
assigned.
Parameters
----------
df : :pandas:`pandas.DataFrame<DataFrame>`
Dataframe with component names in the index. Only required column is
column 'bus', giving the name of the bus the component is connected to.
buses_df : :pandas:`pandas.DataFrame<DataFrame>`
Dataframe with bus information. Bus names are in the index. Only required column
is column 'v_nom', giving the nominal voltage of the voltage level the
bus is in.
Returns
--------
:pandas:`pandas.DataFrame<DataFrame>`
Same dataframe as given in parameter `df` with new column
'voltage_level' specifying the voltage level the component is in
(either 'mv' or 'lv').
"""
df["voltage_level"] = df.apply(
lambda _: "lv" if buses_df.at[_.bus, "v_nom"] < 1 else "mv",
axis=1,
)
return df
[docs]def get_weather_cells_intersecting_with_grid_district(edisgo_obj):
"""
Get all weather cells that intersect with the grid district.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
Returns
-------
set
Set with weather cell IDs
"""
# Download geometries of weather cells
srid = edisgo_obj.topology.grid_district["srid"]
table = climate.Cosmoclmgrid
with session_scope() as session:
query = session.query(
table.gid,
func.ST_AsText(func.ST_Transform(table.geom, srid)).label("geometry"),
)
geom_data = pd.read_sql_query(query.statement, query.session.bind)
geom_data.geometry = geom_data.apply(lambda _: wkt_loads(_.geometry), axis=1)
geom_data = gpd.GeoDataFrame(geom_data, crs=f"EPSG:{srid}")
# Make sure MV Geometry is MultiPolygon
mv_geom = edisgo_obj.topology.grid_district["geom"]
if mv_geom.geom_type == "Polygon":
# Transform Polygon to MultiPolygon and overwrite geometry
p = wkt_loads(str(mv_geom))
m = MultiPolygon([p])
edisgo_obj.topology.grid_district["geom"] = m
elif mv_geom.geom_type == "MultiPolygon":
m = mv_geom
else:
raise ValueError(
f"Grid district geometry is of type {type(mv_geom)}."
" Only Shapely Polygon or MultiPolygon are accepted."
)
mv_geom_gdf = gpd.GeoDataFrame(data={"geometry": [m]}, crs=f"EPSG:{srid}")
return set(
np.append(
gpd.sjoin(
geom_data, mv_geom_gdf, how="right", op="intersects"
).gid.unique(),
edisgo_obj.topology.generators_df.weather_cell_id.dropna().unique(),
)
)
[docs]def get_directory_size(start_dir):
"""
Calculates the size of all files within the start path.
Walks through all files and sub-directories within a given directory and
calculate the sum of size of all files in the directory.
See also
`stackoverflow <https://stackoverflow.com/questions/1392413/\
calculating-a-directorys-size-using-python/1392549#1392549>`_.
Parameters
----------
start_dir : str
Start path.
Returns
-------
int
Size of the directory.
"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(start_dir):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size
[docs]def get_files_recursive(path, files=None):
"""
Recursive function to get all files in a given path and its sub directories.
Parameters
----------
path : str
Directory to start from.
files : list, optional
List of files to start with. Default: None.
Returns
-------
"""
if files is None:
files = []
for f in os.listdir(path):
file = os.path.join(path, f)
if os.path.isdir(file):
files = get_files_recursive(file, files)
else:
files.append(file)
return files
[docs]def add_line_susceptance(
edisgo_obj,
mode="mv_b",
):
"""
Adds line susceptance information in Siemens to lines in existing grids.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
EDisGo object to which line susceptance information is added.
mode : str
Defines how the susceptance is added:
* 'no_b'
Susceptance is set to 0 for all lines.
* 'mv_b' (Default)
Susceptance is for the MV lines set according to the equipment parameters
and for the LV lines it is set to zero.
* 'all_b'
Susceptance is for the MV lines set according to the equipment parameters
and for the LV lines 0.25 uF/km is chosen.
Returns
-------
:class:`~.EDisGo`
"""
line_data_df = pd.concat(
[
edisgo_obj.topology.equipment_data["mv_overhead_lines"],
edisgo_obj.topology.equipment_data["mv_cables"],
edisgo_obj.topology.equipment_data["lv_cables"],
]
)
if mode == "no_b":
line_data_df.loc[:, "C_per_km"] = 0
elif mode == "mv_b":
line_data_df.loc[
edisgo_obj.topology.equipment_data["lv_cables"].index, "C_per_km"
] = 0
elif mode == "all_b":
line_data_df.loc[
edisgo_obj.topology.equipment_data["lv_cables"].index, "C_per_km"
] = 0.25
else:
raise ValueError("Non-existing mode.")
lines_df = edisgo_obj.topology.lines_df
buses_df = edisgo_obj.topology.buses_df
for index, bus0, type_info, length, num_parallel in lines_df[
["bus0", "type_info", "length", "num_parallel"]
].itertuples():
v_nom = buses_df.loc[bus0].v_nom
try:
line_capacitance_per_km = (
line_data_df.loc[line_data_df.U_n == v_nom].loc[type_info].C_per_km
)
except KeyError:
line_capacitance_per_km = line_data_df.loc[type_info].C_per_km
logger.warning(f"False voltage level for line {index}.")
lines_df.loc[index, "b"] = calculate_line_susceptance(
line_capacitance_per_km, length, num_parallel
)
return edisgo_obj
[docs]def resample(
object, freq_orig, method: str = "ffill", freq: str | pd.Timedelta = "15min"
):
"""
Resamples all time series data in given object to a desired resolution.
Parameters
----------
object : :class:`~.network.timeseries.TimeSeries`
Object of which to resample time series data.
freq_orig : :pandas:`pandas.Timedelta<Timedelta>`
Frequency of original time series data.
method : str, optional
See `method` parameter in :attr:`~.EDisGo.resample_timeseries` for more
information.
freq : str, optional
See `freq` parameter in :attr:`~.EDisGo.resample_timeseries` for more
information.
"""
# add time step at the end of the time series in case of up-sampling so that
# last time interval in the original time series is still included
df_dict = {}
for attr in object._attributes:
if not getattr(object, attr).empty:
df_dict[attr] = getattr(object, attr)
if pd.Timedelta(freq) < freq_orig: # up-sampling
new_dates = pd.DatetimeIndex([df_dict[attr].index[-1] + freq_orig])
else: # down-sampling
new_dates = pd.DatetimeIndex([df_dict[attr].index[-1]])
df_dict[attr] = (
df_dict[attr]
.reindex(df_dict[attr].index.union(new_dates).unique().sort_values())
.ffill()
)
# resample time series
if pd.Timedelta(freq) < freq_orig: # up-sampling
if method == "interpolate":
for attr in df_dict.keys():
setattr(
object,
attr,
df_dict[attr].resample(freq, closed="left").interpolate().iloc[:-1],
)
elif method == "ffill":
for attr in df_dict.keys():
setattr(
object,
attr,
df_dict[attr].resample(freq, closed="left").ffill().iloc[:-1],
)
elif method == "bfill":
for attr in df_dict.keys():
setattr(
object,
attr,
df_dict[attr].resample(freq, closed="left").bfill().iloc[:-1],
)
else:
raise NotImplementedError(f"Resampling method {method} is not implemented.")
else: # down-sampling
for attr in df_dict.keys():
setattr(
object,
attr,
df_dict[attr].resample(freq).mean(),
)