import os
import pandas as pd
import numpy as np
from math import sqrt
import logging
import datetime
from pyomo.environ import Constraint
import networkx as nx
import csv
from pypsa import Network as PyPSANetwork
import edisgo
from edisgo.tools import config, tools
from edisgo.tools import pypsa_io_lopf, pypsa_io
from edisgo.data.import_data import import_from_ding0, import_generators, \
import_feedin_timeseries, import_load_timeseries
from edisgo.flex_opt.reinforce_grid import reinforce_grid
from edisgo.flex_opt import storage_integration, storage_operation, \
curtailment, storage_positioning
from edisgo.grid.components import Station, BranchTee, Generator, Load, \
GeneratorFluctuating
from edisgo.grid.tools import get_gen_info, disconnect_storage
from edisgo.grid.grids import MVGrid
from edisgo.tools import plots
logger = logging.getLogger('edisgo')
[docs]class EDisGoReimport:
"""
EDisGo class created from saved results.
"""
def __init__(self, results_path, **kwargs):
if os.path.isdir(results_path):
# create network
self.network = NetworkReimport(results_path, **kwargs)
else:
logging.error('Results cannot be imported as the specified '
'directory {} does not exist.'.format(results_path))
[docs] def plot_mv_grid_topology(self, technologies=False, **kwargs):
"""
Plots plain MV grid topology and optionally nodes by technology type
(e.g. station or generator).
Parameters
----------
technologies : :obj:`Boolean`
If True plots stations, generators, etc. in the grid in different
colors. If False does not plot any nodes. Default: False.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
if self.network.pypsa is None:
try:
timesteps = self.network.timeseries.timeindex
self.network.pypsa = pypsa_io.to_pypsa(
self.network, mode=None, timesteps=timesteps)
except:
logging.warning(
"pypsa representation of MV grid needed to plot MV "
"grid topology.")
if self.network.pypsa is not None:
plots.mv_grid_topology(
self.network.pypsa, self.network.config,
node_color='technology' if technologies is True else None,
filename=kwargs.get('filename', None),
grid_district_geom=kwargs.get('grid_district_geom', True),
background_map=kwargs.get('background_map', True),
xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None),
title=kwargs.get('title', ''))
[docs] def plot_mv_voltages(self, **kwargs):
"""
Plots voltages in MV grid on grid topology plot.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
if self.network.pypsa is not None:
try:
v_res = self.network.results.v_res()
except:
logging.warning("Voltages `pfa_v_mag_pu` from power flow "
"analysis must be available to plot them.")
return
plots.mv_grid_topology(
self.network.pypsa, self.network.config,
timestep=kwargs.get('timestep', None),
node_color='voltage',
filename=kwargs.get('filename', None),
grid_district_geom=kwargs.get('grid_district_geom', True),
background_map=kwargs.get('background_map', True),
voltage=v_res,
limits_cb_nodes=kwargs.get('limits_cb_nodes', None),
xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None),
title=kwargs.get('title', ''))
else:
logging.warning("pypsa representation of MV grid needed to "
"plot voltages.")
[docs] def plot_mv_line_loading(self, **kwargs):
"""
Plots relative line loading (current from power flow analysis to
allowed current) of MV lines.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
if self.network.pypsa is not None and \
self.network.results.i_res is not None:
plots.mv_grid_topology(
self.network.pypsa, self.network.config,
timestep=kwargs.get('timestep', None),
line_color='loading',
node_color=kwargs.get('node_color', None),
line_load=self.network.results.i_res,
filename=kwargs.get('filename', None),
arrows=kwargs.get('arrows', None),
grid_district_geom=kwargs.get('grid_district_geom', True),
background_map=kwargs.get('background_map', True),
voltage=self.network.results.v_res(),
limits_cb_lines=kwargs.get('limits_cb_lines', None),
limits_cb_nodes=kwargs.get('limits_cb_nodes', None),
xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None),
lines_cmap=kwargs.get('lines_cmap', 'inferno_r'),
title=kwargs.get('title', ''),
scaling_factor_line_width=kwargs.get(
'scaling_factor_line_width', None))
else:
if self.network.pypsa is None:
logging.warning("pypsa representation of MV grid needed to "
"plot line loading.")
if self.network.results.i_res is None:
logging.warning("Currents `i_res` from power flow analysis "
"must be available to plot line loading.")
[docs] def plot_mv_grid_expansion_costs(self, **kwargs):
"""
Plots costs per MV line.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
if self.network.pypsa is not None and \
self.network.results.grid_expansion_costs is not None:
if isinstance(self, EDisGo):
# convert index of grid expansion costs to str
grid_expansion_costs = \
self.network.results.grid_expansion_costs.reset_index()
grid_expansion_costs['index'] = \
grid_expansion_costs['index'].apply(lambda _: repr(_))
grid_expansion_costs.set_index('index', inplace=True)
else:
grid_expansion_costs = \
self.network.results.grid_expansion_costs
plots.mv_grid_topology(
self.network.pypsa, self.network.config,
line_color='expansion_costs',
grid_expansion_costs=grid_expansion_costs,
filename=kwargs.get('filename', None),
grid_district_geom=kwargs.get('grid_district_geom', True),
background_map=kwargs.get('background_map', True),
limits_cb_lines=kwargs.get('limits_cb_lines', None),
xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None),
lines_cmap=kwargs.get('lines_cmap', 'inferno_r'),
title=kwargs.get('title', ''),
scaling_factor_line_width=kwargs.get(
'scaling_factor_line_width', None)
)
else:
if self.network.pypsa is None:
logging.warning("pypsa representation of MV grid needed to "
"plot grid expansion costs.")
if self.network.results.grid_expansion_costs is None:
logging.warning("Grid expansion cost results needed to plot "
"them.")
[docs] def plot_mv_storage_integration(self, **kwargs):
"""
Plots storage position in MV grid of integrated storages.
For more information see :func:`edisgo.tools.plots.mv_grid_topology`.
"""
if self.network.pypsa is not None:
plots.mv_grid_topology(
self.network.pypsa, self.network.config,
node_color='storage_integration',
filename=kwargs.get('filename', None),
grid_district_geom=kwargs.get('grid_district_geom', True),
background_map=kwargs.get('background_map', True),
xlim=kwargs.get('xlim', None), ylim=kwargs.get('ylim', None),
title=kwargs.get('title', ''))
else:
if self.network.pypsa is None:
logging.warning("pypsa representation of MV grid needed to "
"plot storage integration in MV grid.")
[docs] def histogram_voltage(self, timestep=None, title=True, **kwargs):
"""
Plots histogram of voltages.
For more information on the histogram plot and possible configurations
see :func:`edisgo.tools.plots.histogram`.
Parameters
----------
timestep : :pandas:`pandas.Timestamp<timestamp>` or list(:pandas:`pandas.Timestamp<timestamp>`) or None, optional
Specifies time steps histogram is plotted for. If timestep is None
all time steps voltages are calculated for are used. Default: None.
title : :obj:`str` or :obj:`bool`, optional
Title for plot. If True title is auto generated. If False plot has
no title. If :obj:`str`, the provided title is used. Default: True.
"""
data = self.network.results.v_res()
if timestep is None:
timestep = data.index
# check if timesteps is array-like, otherwise convert to list
if not hasattr(timestep, "__len__"):
timestep = [timestep]
if title is True:
if len(timestep) == 1:
title = "Voltage histogram for time step {}".format(
timestep[0])
else:
title = "Voltage histogram \nfor time steps {} to {}".format(
timestep[0], timestep[-1])
elif title is False:
title = None
plots.histogram(data=data, title=title, timeindex=timestep, **kwargs)
[docs] def histogram_relative_line_load(self, timestep=None, title=True,
voltage_level='mv_lv', **kwargs):
"""
Plots histogram of relative line loads.
For more information on how the relative line load is calculated see
:func:`edisgo.tools.tools.get_line_loading_from_network`.
For more information on the histogram plot and possible configurations
see :func:`edisgo.tools.plots.histogram`.
Parameters
----------
timestep : :pandas:`pandas.Timestamp<timestamp>` or list(:pandas:`pandas.Timestamp<timestamp>`) or None, optional
Specifies time step(s) histogram is plotted for. If `timestep` is
None all time steps currents are calculated for are used.
Default: None.
title : :obj:`str` or :obj:`bool`, optional
Title for plot. If True title is auto generated. If False plot has
no title. If :obj:`str`, the provided title is used. Default: True.
voltage_level : :obj:`str`
Specifies which voltage level to plot voltage histogram for.
Possible options are 'mv', 'lv' and 'mv_lv'. 'mv_lv' is also the
fallback option in case of wrong input. Default: 'mv_lv'
"""
if voltage_level == 'mv':
lines = self.network.pypsa.lines.loc[
self.network.pypsa.lines.v_nom > 1]
elif voltage_level == 'lv':
lines = self.network.pypsa.lines.loc[
self.network.pypsa.lines.v_nom < 1]
else:
lines = self.network.pypsa.lines
rel_line_loading = tools.calculate_relative_line_load(
self.network.pypsa, self.network.config,
self.network.results.i_res, self.network.pypsa.lines.v_nom,
lines.index, timestep)
if timestep is None:
timestep = rel_line_loading.index
# check if timesteps is array-like, otherwise convert to list
if not hasattr(timestep, "__len__"):
timestep = [timestep]
if title is True:
if len(timestep) == 1:
title = "Relative line load histogram for time step {}".format(
timestep[0])
else:
title = "Relative line load histogram \nfor time steps " \
"{} to {}".format(timestep[0], timestep[-1])
elif title is False:
title = None
plots.histogram(data=rel_line_loading, title=title, **kwargs)
[docs]class EDisGo(EDisGoReimport):
"""
Provides the top-level API for invocation of data import, analysis of
hosting capacity, grid reinforcement and flexibility measures.
Parameters
----------
worst_case_analysis : None or :obj:`str`, optional
If not None time series for feed-in and load will be generated
according to the chosen worst case analysis
Possible options are:
* 'worst-case'
feed-in for the two worst-case scenarios feed-in case and load case
are generated
* 'worst-case-feedin'
feed-in for the worst-case scenario feed-in case is generated
* 'worst-case-load'
feed-in for the worst-case scenario load case is generated
Be aware that if you choose to conduct a worst-case analysis your
input for the following parameters will not be used:
* `timeseries_generation_fluctuating`
* `timeseries_generation_dispatchable`
* `timeseries_load`
mv_grid_id : :obj:`str`
MV grid ID used in import of ding0 grid.
.. ToDo: explain where MV grid IDs come from
ding0_grid : file: :obj:`str` or :class:`ding0.core.NetworkDing0`
If a str is provided it is assumed it points to a pickle with Ding0
grid data. This file will be read. If an object of the type
:class:`ding0.core.NetworkDing0` data will be used directly from this
object.
This will probably be removed when ding0 grids are in oedb.
config_path : None or :obj:`str` or :obj:`dict`
Path to the config directory. Options are:
* None
If `config_path` is None configs are loaded from the edisgo
default config directory ($HOME$/.edisgo). If the directory
does not exist it is created. If config files don't exist the
default config files are copied into the directory.
* :obj:`str`
If `config_path` is a string configs will be loaded from the
directory specified by `config_path`. If the directory
does not exist it is created. If config files don't exist the
default config files are copied into the directory.
* :obj:`dict`
A dictionary can be used to specify different paths to the
different config files. The dictionary must have the following
keys:
* 'config_db_tables'
* 'config_grid'
* 'config_grid_expansion'
* 'config_timeseries'
Values of the dictionary are paths to the corresponding
config file. In contrast to the other two options the directories
and config files must exist and are not automatically created.
Default: None.
scenario_description : None or :obj:`str`
Can be used to describe your scenario but is not used for anything
else. Default: None.
timeseries_generation_fluctuating : :obj:`str` or :pandas:`pandas.DataFrame<dataframe>`
Parameter used to obtain time series for active power feed-in of
fluctuating renewables wind and solar.
Possible options are:
* 'oedb'
Time series for the year 2011 are obtained from the OpenEnergy
DataBase.
* :pandas:`pandas.DataFrame<dataframe>`
DataFrame with time series, normalized with corresponding capacity.
Time series can either be aggregated by technology type or by type
and weather cell ID. In the first case columns of the DataFrame are
'solar' and 'wind'; in the second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level
containing the type and the second level the weather cell id.
Index needs to be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
.. ToDo: explain how to obtain weather cell id,
.. ToDo: add link to explanation of weather cell id
timeseries_generation_dispatchable : :pandas:`pandas.DataFrame<dataframe>`
DataFrame with time series for active power of each (aggregated)
type of dispatchable generator normalized with corresponding capacity.
Index needs to be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent generator type:
* 'gas'
* 'coal'
* 'biomass'
* 'other'
* ...
Use 'other' if you don't want to explicitly provide every possible
type.
timeseries_generation_reactive_power : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series of normalized reactive power (normalized by
the rated nominal active power) per technology and weather cell. Index
needs to be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent generator type and can be a MultiIndex column
containing the weather cell ID in the second level. If the technology
doesn't contain weather cell information i.e. if it is other than solar
and wind generation, this second level can be left as a numpy Nan or a
None.
Default: None.
If no time series for the technology or technology and weather cell ID
is given, reactive power will be calculated from power factor and
power factor mode in the config sections `reactive_power_factor` and
`reactive_power_mode` and a warning will be raised. See
:class:`~.grid.components.Generator` and
:class:`~.grid.components.GeneratorFluctuating` for more information.
timeseries_load : :obj:`str` or :pandas:`pandas.DataFrame<dataframe>`
Parameter used to obtain time series of active power of (cumulative)
loads.
Possible options are:
* 'demandlib'
Time series for the year specified in `timeindex` are
generated using the oemof demandlib.
* :pandas:`pandas.DataFrame<dataframe>`
DataFrame with load time series of each (cumulative) type of load
normalized with corresponding annual energy demand. Index needs to
be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
timeseries_load_reactive_power : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series of normalized reactive power (normalized by
annual energy demand) per load sector. Index needs to be a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
Default: None.
If no time series for the load sector is given, reactive power will be
calculated from power factor and power factor mode in the config
sections `reactive_power_factor` and `reactive_power_mode` and a
warning will be raised. See :class:`~.grid.components.Load` for
more information.
generator_scenario : None or :obj:`str`
If provided defines which scenario of future generator park to use
and invokes import of these generators. Possible options are 'nep2035'
and 'ego100'.
.. ToDo: Add link to explanation of scenarios.
timeindex : None or :pandas:`pandas.DatetimeIndex<datetimeindex>`
Can be used to select time ranges of the feed-in and load time series
that will be used in the power flow analysis. Also defines the year
load time series are obtained for when choosing the 'demandlib' option
to generate load time series.
Attributes
----------
network : :class:`~.grid.network.Network`
The network is a container object holding all data.
Examples
--------
Assuming you have the Ding0 `ding0_data.pkl` in CWD
Create eDisGo Network object by loading Ding0 file
>>> from edisgo.grid.network import EDisGo
>>> edisgo = EDisGo(ding0_grid='ding0_data.pkl', mode='worst-case-feedin')
Analyze hosting capacity for MV and LV grid level
>>> edisgo.analyze()
Print LV station secondary side voltage levels returned by PFA
>>> lv_stations = edisgo.network.mv_grid.graph.nodes_by_attribute(
>>> 'lv_station')
>>> print(edisgo.network.results.v_res(lv_stations, 'lv'))
"""
def __init__(self, **kwargs):
# create network
self.network = Network(
generator_scenario=kwargs.get('generator_scenario', None),
config_path=kwargs.get('config_path', None),
scenario_description=kwargs.get('scenario_description', None))
# load grid
# ToDo: should at some point work with only MV grid ID
self.import_from_ding0(kwargs.get('ding0_grid', None))
# set up time series for feed-in and load
# worst-case time series
if kwargs.get('worst_case_analysis', None):
self.network.timeseries = TimeSeriesControl(
network=self.network,
mode=kwargs.get('worst_case_analysis', None)).timeseries
else:
self.network.timeseries = TimeSeriesControl(
network=self.network,
timeseries_generation_fluctuating=kwargs.get(
'timeseries_generation_fluctuating', None),
timeseries_generation_dispatchable=kwargs.get(
'timeseries_generation_dispatchable', None),
timeseries_generation_reactive_power=kwargs.get(
'timeseries_generation_reactive_power', None),
timeseries_load=kwargs.get(
'timeseries_load', None),
timeseries_load_reactive_power = kwargs.get(
'timeseries_load_reactive_power', None),
timeindex=kwargs.get('timeindex', None)).timeseries
# import new generators
if self.network.generator_scenario is not None:
self.import_generators()
[docs] def curtail(self, methodology, curtailment_timeseries, **kwargs):
"""
Sets up curtailment time series.
Curtailment time series are written into
:class:`~.grid.network.TimeSeries`. See
:class:`~.grid.network.CurtailmentControl` for more information on
parameters and methodologies.
"""
CurtailmentControl(edisgo=self, methodology=methodology,
curtailment_timeseries=curtailment_timeseries,
**kwargs)
[docs] def import_from_ding0(self, file, **kwargs):
"""Import grid data from DINGO file
For details see
:func:`edisgo.data.import_data.import_from_ding0`
"""
import_from_ding0(file=file, network=self.network)
[docs] def import_generators(self, generator_scenario=None):
"""Import generators
For details see
:func:`edisgo.data.import_data.import_generators`
"""
if generator_scenario:
self.network.generator_scenario = generator_scenario
data_source = 'oedb'
import_generators(network=self.network, data_source=data_source)
[docs] def analyze(self, mode=None, timesteps=None):
"""Analyzes the grid by power flow analysis
Analyze the grid for violations of hosting capacity. Means, perform a
power flow analysis and obtain voltages at nodes (load, generator,
stations/transformers and branch tees) and active/reactive power at
lines.
The power flow analysis can currently only be performed for both grid
levels MV and LV. See ToDos section for more information.
A static `non-linear power flow analysis is performed using PyPSA
<https://www.pypsa.org/doc/power_flow.html#full-non-linear-power-flow>`_.
The high-voltage to medium-voltage transformer are not included in the
analysis. The slack bus is defined at secondary side of these
transformers assuming an ideal tap changer. Hence, potential
overloading of the transformers is not studied here.
Parameters
----------
mode : str
Allows to toggle between power flow analysis (PFA) on the whole
grid topology (MV + LV), only MV or only LV. Defaults to None which
equals power flow analysis for MV + LV which is the only
implemented option at the moment. See ToDos section for
more information.
timesteps : :pandas:`pandas.DatetimeIndex<datetimeindex>` or :pandas:`pandas.Timestamp<timestamp>`
Timesteps specifies for which time steps to conduct the power flow
analysis. It defaults to None in which case the time steps in
timeseries.timeindex (see :class:`~.grid.network.TimeSeries`) are
used.
Notes
-----
The current implementation always translates the grid topology
representation to the PyPSA format and stores it to
:attr:`self.network.pypsa`.
ToDos
------
The option to export only the edisgo MV grid (mode = 'mv') to conduct
a power flow analysis is implemented in
:func:`~.tools.pypsa_io.to_pypsa` but NotImplementedError is raised
since the rest of edisgo does not handle this option yet. The analyze
function will throw an error since
:func:`~.tools.pypsa_io.process_pfa_results`
does not handle aggregated loads and generators in the LV grids. Also,
grid reinforcement, pypsa update of time series, and probably other
functionalities do not work when only the MV grid is analysed.
Further ToDos are:
* explain how power plants are modeled, if possible use a link
* explain where to find and adjust power flow analysis defining
parameters
See Also
--------
:func:`~.tools.pypsa_io.to_pypsa`
Translator to PyPSA data format
"""
if timesteps is None:
timesteps = self.network.timeseries.timeindex
# check if timesteps is array-like, otherwise convert to list
if not hasattr(timesteps, "__len__"):
timesteps = [timesteps]
if self.network.pypsa is None:
# Translate eDisGo grid topology representation to PyPSA format
self.network.pypsa = pypsa_io.to_pypsa(
self.network, mode, timesteps)
else:
if self.network.pypsa.edisgo_mode is not mode:
# Translate eDisGo grid topology representation to PyPSA format
self.network.pypsa = pypsa_io.to_pypsa(
self.network, mode, timesteps)
# check if all timesteps are in pypsa.snapshots, if not update time
# series
if False in [True if _ in self.network.pypsa.snapshots else False
for _ in timesteps]:
pypsa_io.update_pypsa_timeseries(self.network, timesteps=timesteps)
# run power flow analysis
pf_results = self.network.pypsa.pf(timesteps)
if all(pf_results['converged']['0'].tolist()):
pypsa_io.process_pfa_results(
self.network, self.network.pypsa, timesteps)
else:
raise ValueError("Power flow analysis did not converge.")
[docs] def analyze_lopf(self, mode=None, timesteps=None,
etrago_max_storage_size=None):
"""Analyzes the grid by power flow analysis
Analyze the grid for violations of hosting capacity. Means, perform a
power flow analysis and obtain voltages at nodes (load, generator,
stations/transformers and branch tees) and active/reactive power at
lines.
The power flow analysis can currently only be performed for both grid
levels MV and LV. See ToDos section for more information.
A static `non-linear power flow analysis is performed using PyPSA
<https://www.pypsa.org/doc/power_flow.html#full-non-linear-power-flow>`_.
The high-voltage to medium-voltage transformer are not included in the
analysis. The slack bus is defined at secondary side of these
transformers assuming an ideal tap changer. Hence, potential
overloading of the transformers is not studied here.
Parameters
----------
mode : str
Allows to toggle between power flow analysis (PFA) on the whole
grid topology (MV + LV), only MV or only LV. Defaults to None which
equals power flow analysis for MV + LV which is the only
implemented option at the moment. See ToDos section for
more information.
timesteps : :pandas:`pandas.DatetimeIndex<datetimeindex>` or :pandas:`pandas.Timestamp<timestamp>`
Timesteps specifies for which time steps to conduct the power flow
analysis. It defaults to None in which case the time steps in
timeseries.timeindex (see :class:`~.grid.network.TimeSeries`) are
used.
Notes
-----
The current implementation always translates the grid topology
representation to the PyPSA format and stores it to
:attr:`self.network.pypsa`.
ToDos
------
The option to export only the edisgo MV grid (mode = 'mv') to conduct
a power flow analysis is implemented in
:func:`~.tools.pypsa_io.to_pypsa` but NotImplementedError is raised
since the rest of edisgo does not handle this option yet. The analyze
function will throw an error since
:func:`~.tools.pypsa_io.process_pfa_results`
does not handle aggregated loads and generators in the LV grids. Also,
grid reinforcement, pypsa update of time series, and probably other
functionalities do not work when only the MV grid is analysed.
Further ToDos are:
* explain how power plants are modeled, if possible use a link
* explain where to find and adjust power flow analysis defining
parameters
See Also
--------
:func:`~.tools.pypsa_io.to_pypsa`
Translator to PyPSA data format
"""
if timesteps is None:
timesteps = self.network.timeseries.timeindex
# check if timesteps is array-like, otherwise convert to list
if not hasattr(timesteps, "__len__"):
timesteps = [timesteps]
# Translate eDisGo grid topology representation to PyPSA format
logging.debug('Translate eDisGo grid topology representation to '
'PyPSA format.')
self.network.pypsa_lopf = pypsa_io_lopf.to_pypsa(
self.network, mode, timesteps)
logging.debug('Translating eDisGo grid topology representation to '
'PyPSA format finished.')
# add total storage capacity constraint
def extra_functionality(network, snapshots):
model = network.model
# total installed capacity
model.storages_p_nom = Constraint(
rule=lambda model: sum(
model.generator_p_nom[s]
for s in self.network.pypsa_lopf.generators[
self.network.pypsa_lopf.generators.type ==
'Storage'].index) <= etrago_max_storage_size)
# run power flow analysis
self.network.pypsa_lopf.lopf(
snapshots=timesteps, solver_name='cbc', keep_files=False,
extra_functionality=extra_functionality,
solver_options={'tee': True})
# self.network.pypsa.model.write(
# io_options={'symbolic_solver_labels': True})
print('objective: {}'.format(self.network.pypsa_lopf.objective))
# relevant outputs
# plot MV grid
plots.storage_size(self.network.mv_grid, self.network.pypsa_lopf,
filename='storage_results_{}.pdf'.format(
self.network.id))
storages = self.network.mv_grid.graph.nodes_by_attribute('storage')
storages_repr = [repr(_) for _ in storages]
print('Installed storage capacity: {} MW'.format(
self.network.pypsa_lopf.generators.loc[
storages_repr, 'p_nom_opt'].sum()))
# export storage results (pypsa and path to storage)
pypsa_storages_df = self.network.pypsa_lopf.generators.loc[
storages_repr, :].sort_values(by=['p_nom_opt'], ascending=False)
storage_repr = []
storage_path = []
for s in storages:
storage_repr.append(repr(s))
storage_path.append(nx.shortest_path(self.network.mv_grid.graph,
self.network.mv_grid.station, s))
graph_storages_df = pd.DataFrame({'path': storage_path},
index=storage_repr)
pypsa_storages_df.join(graph_storages_df).to_csv(
'storage_results_{}.csv'.format(self.network.id))
# take largest 8 storages and remove the rest
keep_storages = pypsa_storages_df.iloc[:8, :].index
remove_storages = pypsa_storages_df.iloc[8:, :].index
# write time series to kept storages
for s in keep_storages:
keep_storage_obj = [_ for _ in storages if repr(_)==s][0]
ts = self.network.pypsa_lopf.generators_t.p.loc[:, s]
keep_storage_obj.timeseries = pd.DataFrame({'p': ts * 1000,
'q': [0] * len(ts)},
index=ts.index)
# delete small storages
for s in remove_storages:
disconnect_storage(self.network,
[_ for _ in storages if repr(_)==s][0])
[docs] def reinforce(self, **kwargs):
"""
Reinforces the grid and calculates grid expansion costs.
See :meth:`edisgo.flex_opt.reinforce_grid` for more information.
"""
results = reinforce_grid(
self, max_while_iterations=kwargs.get(
'max_while_iterations', 10),
copy_graph=kwargs.get('copy_graph', False),
timesteps_pfa=kwargs.get('timesteps_pfa', None),
combined_analysis=kwargs.get('combined_analysis', False))
# add measure to Results object
if not kwargs.get('copy_graph', False):
self.network.results.measures = 'grid_expansion'
return results
[docs] def integrate_storage(self, timeseries, position, **kwargs):
"""
Integrates storage into grid.
See :class:`~.grid.network.StorageControl` for more information.
"""
StorageControl(edisgo=self, timeseries=timeseries,
position=position, **kwargs)
[docs]class Network:
"""
Used as container for all data related to a single
:class:`~.grid.grids.MVGrid`.
Parameters
----------
scenario_description : :obj:`str`, optional
Can be used to describe your scenario but is not used for anything
else. Default: None.
config_path : None or :obj:`str` or :obj:`dict`, optional
See :class:`~.grid.network.Config` for further information.
Default: None.
metadata : :obj:`dict`
Metadata of Network such as ?
data_sources : :obj:`dict` of :obj:`str`
Data Sources of grid, generators etc.
Keys: 'grid', 'generators', ?
mv_grid : :class:`~.grid.grids.MVGrid`
Medium voltage (MV) grid
generator_scenario : :obj:`str`
Defines which scenario of future generator park to use.
Attributes
----------
results : :class:`~.grid.network.Results`
Object with results from power flow analyses
"""
def __init__(self, **kwargs):
self._scenario_description = kwargs.get('scenario_description', None)
self._config = Config(config_path=kwargs.get('config_path', None))
self._equipment_data = self._load_equipment_data()
self._metadata = kwargs.get('metadata', None)
self._data_sources = kwargs.get('data_sources', {})
self._generator_scenario = kwargs.get('generator_scenario', None)
self._mv_grid = kwargs.get('mv_grid', None)
self._pypsa = None
self.results = Results(self)
self._dingo_import_data = []
def _load_equipment_data(self):
"""Load equipment data for transformers, cables etc.
Returns
-------
:obj:`dict` of :pandas:`pandas.DataFrame<dataframe>`
"""
package_path = edisgo.__path__[0]
equipment_dir = self.config['system_dirs']['equipment_dir']
data = {}
equipment = {'mv': ['trafos', 'lines', 'cables'],
'lv': ['trafos', 'cables']}
for voltage_level, eq_list in equipment.items():
for i in eq_list:
equipment_parameters = self.config['equipment'][
'equipment_{}_parameters_{}'.format(voltage_level, i)]
data['{}_{}'.format(voltage_level, i)] = pd.read_csv(
os.path.join(package_path, equipment_dir,
equipment_parameters),
comment='#', index_col='name',
delimiter=',', decimal='.')
# calculate electrical values of transformer from standard values (so far only for LV transformers, not necessary for MV as MV impedances not used)
if voltage_level == 'lv' and i == 'trafos':
# Simplification of r = R/Z_nom with R = P_k*(U_n)²/S_nom and Z_nom = (U_n)²/S_nom => r = P_k/S_nom
data['{}_{}'.format(voltage_level, i)]['r_pu'] = data['{}_{}'.format(voltage_level, i)]['P_k']/\
(data['{}_{}'.format(voltage_level, i)]['S_nom']*1000)
# x = sqrt(z²-r²) with Simplification of z = Z/Z_nom with Z = u_kr[%]/100 * (U_n)²/S_n and Z_nom = (U_n)²/S_nom => z = u_kr[%]/100
data['{}_{}'.format(voltage_level, i)]['x_pu'] = np.sqrt((data['{}_{}'.format(voltage_level, i)]['u_kr']/100)**2\
-data['{}_{}'.format(voltage_level, i)]['r_pu']**2)
return data
@property
def id(self):
"""
MV grid ID
Returns
--------
:obj:`str`
MV grid ID
"""
return self._id
@property
def config(self):
"""
eDisGo configuration data.
Returns
-------
:class:`~.grid.network.Config`
Config object with configuration data from config files.
"""
return self._config
@config.setter
def config(self, config_path):
self._config = Config(config_path=config_path)
@property
def metadata(self):
"""
Metadata of Network
Returns
--------
:obj:`dict`
Metadata of Network
"""
return self._metadata
@property
def generator_scenario(self):
"""
Defines which scenario of future generator park to use.
Parameters
----------
generator_scenario_name : :obj:`str`
Name of scenario of future generator park
Returns
--------
:obj:`str`
Name of scenario of future generator park
"""
return self._generator_scenario
@generator_scenario.setter
def generator_scenario(self, generator_scenario_name):
self._generator_scenario = generator_scenario_name
@property
def scenario_description(self):
"""
Used to describe your scenario but not used for anything else.
Parameters
----------
scenario_description : :obj:`str`
Description of scenario
Returns
--------
:obj:`str`
Scenario name
"""
return self._scenario_description
@scenario_description.setter
def scenario_description(self, scenario_description):
self._scenario_description = scenario_description
@property
def equipment_data(self):
"""
Technical data of electrical equipment such as lines and transformers
Returns
--------
:obj:`dict` of :pandas:`pandas.DataFrame<dataframe>`
Data of electrical equipment
"""
return self._equipment_data
@property
def mv_grid(self):
"""
Medium voltage (MV) grid
Parameters
----------
mv_grid : :class:`~.grid.grids.MVGrid`
Medium voltage (MV) grid
Returns
--------
:class:`~.grid.grids.MVGrid`
Medium voltage (MV) grid
"""
return self._mv_grid
@mv_grid.setter
def mv_grid(self, mv_grid):
self._mv_grid = mv_grid
@property
def timeseries(self):
"""
Object containing load and feed-in time series.
Parameters
----------
timeseries : :class:`~.grid.network.TimeSeries`
Object containing load and feed-in time series.
Returns
--------
:class:`~.grid.network.TimeSeries`
Object containing load and feed-in time series.
"""
return self._timeseries
@timeseries.setter
def timeseries(self, timeseries):
self._timeseries = timeseries
@property
def data_sources(self):
"""
Dictionary with data sources of grid, generators etc.
Returns
--------
:obj:`dict` of :obj:`str`
Data Sources of grid, generators etc.
"""
return self._data_sources
[docs] def set_data_source(self, key, data_source):
"""
Set data source for key (e.g. 'grid')
Parameters
----------
key : :obj:`str`
Specifies data
data_source : :obj:`str`
Specifies data source
"""
self._data_sources[key] = data_source
@property
def dingo_import_data(self):
"""
Temporary data from ding0 import needed for OEP generator update
"""
return self._dingo_import_data
@dingo_import_data.setter
def dingo_import_data(self, dingo_data):
self._dingo_import_data = dingo_data
@property
def pypsa(self):
"""
PyPSA grid representation
A grid topology representation based on
:pandas:`pandas.DataFrame<dataframe>`. The overall container object of
this data model, the :pypsa:`pypsa.Network<network>`,
is assigned to this attribute.
Parameters
----------
pypsa : :pypsa:`pypsa.Network<network>`
The `PyPSA network
<https://www.pypsa.org/doc/components.html#network>`_ container.
Returns
-------
:pypsa:`pypsa.Network<network>`
PyPSA grid representation. The attribute `edisgo_mode` is added
to specify if pypsa representation of the edisgo network
was created for the whole grid topology (MV + LV), only MV or only
LV. See parameter `mode` in
:meth:`~.grid.network.EDisGo.analyze` for more information.
"""
return self._pypsa
@pypsa.setter
def pypsa(self, pypsa):
self._pypsa = pypsa
def __repr__(self):
return 'Network ' + str(self._id)
[docs]class Config:
"""
Container for all configurations.
Parameters
-----------
config_path : None or :obj:`str` or :obj:`dict`
Path to the config directory. Options are:
* None
If `config_path` is None configs are loaded from the edisgo
default config directory ($HOME$/.edisgo). If the directory
does not exist it is created. If config files don't exist the
default config files are copied into the directory.
* :obj:`str`
If `config_path` is a string configs will be loaded from the
directory specified by `config_path`. If the directory
does not exist it is created. If config files don't exist the
default config files are copied into the directory.
* :obj:`dict`
A dictionary can be used to specify different paths to the
different config files. The dictionary must have the following
keys:
* 'config_db_tables'
* 'config_grid'
* 'config_grid_expansion'
* 'config_timeseries'
Values of the dictionary are paths to the corresponding
config file. In contrast to the other two options the directories
and config files must exist and are not automatically created.
Default: None.
Notes
-----
The Config object can be used like a dictionary. See example on how to use
it.
Examples
--------
Create Config object from default config files
>>> from edisgo.grid.network import Config
>>> config = Config()
Get reactive power factor for generators in the MV grid
>>> config['reactive_power_factor']['mv_gen']
"""
def __init__(self, **kwargs):
self._data = self._load_config(kwargs.get('config_path', None))
@staticmethod
def _load_config(config_path=None):
"""
Load config files.
Parameters
-----------
config_path : None or :obj:`str` or dict
See class definition for more information.
Returns
-------
:obj:`collections.OrderedDict`
eDisGo configuration data from config files.
"""
config_files = ['config_db_tables', 'config_grid',
'config_grid_expansion', 'config_timeseries']
# load configs
if isinstance(config_path, dict):
for conf in config_files:
config.load_config(filename='{}.cfg'.format(conf),
config_dir=config_path[conf],
copy_default_config=False)
else:
for conf in config_files:
config.load_config(filename='{}.cfg'.format(conf),
config_dir=config_path)
config_dict = config.cfg._sections
# convert numeric values to float
for sec, subsecs in config_dict.items():
for subsec, val in subsecs.items():
# try str -> float conversion
try:
config_dict[sec][subsec] = float(val)
except:
pass
# convert to time object
config_dict['demandlib']['day_start'] = datetime.datetime.strptime(
config_dict['demandlib']['day_start'], "%H:%M")
config_dict['demandlib']['day_start'] = datetime.time(
config_dict['demandlib']['day_start'].hour,
config_dict['demandlib']['day_start'].minute)
config_dict['demandlib']['day_end'] = datetime.datetime.strptime(
config_dict['demandlib']['day_end'], "%H:%M")
config_dict['demandlib']['day_end'] = datetime.time(
config_dict['demandlib']['day_end'].hour,
config_dict['demandlib']['day_end'].minute)
return config_dict
def __getitem__(self, key1, key2=None):
if key2 is None:
try:
return self._data[key1]
except:
raise KeyError(
"Config does not contain section {}.".format(key1))
else:
try:
return self._data[key1][key2]
except:
raise KeyError("Config does not contain value for {} or "
"section {}.".format(key2, key1))
def __setitem__(self, key, value):
self._data[key] = value
def __delitem__(self, key):
del self._data[key]
def __iter__(self):
return iter(self._data)
def __len__(self):
return len(self._data)
[docs]class TimeSeriesControl:
"""
Sets up TimeSeries Object.
Parameters
----------
network : :class:`~.grid.network.Network`
The eDisGo data container
mode : :obj:`str`, optional
Mode must be set in case of worst-case analyses and can either be
'worst-case' (both feed-in and load case), 'worst-case-feedin' (only
feed-in case) or 'worst-case-load' (only load case). All other
parameters except of `config-data` will be ignored. Default: None.
timeseries_generation_fluctuating : :obj:`str` or :pandas:`pandas.DataFrame<dataframe>`, optional
Parameter used to obtain time series for active power feed-in of
fluctuating renewables wind and solar.
Possible options are:
* 'oedb'
Time series for 2011 are obtained from the OpenEnergy DataBase.
`mv_grid_id` and `scenario_description` have to be provided when
choosing this option.
* :pandas:`pandas.DataFrame<dataframe>`
DataFrame with time series, normalized with corresponding capacity.
Time series can either be aggregated by technology type or by type
and weather cell ID. In the first case columns of the DataFrame are
'solar' and 'wind'; in the second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level
containing the type and the second level the weather cell ID.
Default: None.
timeseries_generation_dispatchable : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series for active power of each (aggregated)
type of dispatchable generator normalized with corresponding capacity.
Columns represent generator type:
* 'gas'
* 'coal'
* 'biomass'
* 'other'
* ...
Use 'other' if you don't want to explicitly provide every possible
type. Default: None.
timeseries_generation_reactive_power : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series of normalized reactive power (normalized by
the rated nominal active power) per technology and weather cell. Index
needs to be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent generator type and can be a MultiIndex column
containing the weather cell ID in the second level. If the technology
doesn't contain weather cell information i.e. if it is other than solar
and wind generation, this second level can be left as an empty string ''.
Default: None.
timeseries_load : :obj:`str` or :pandas:`pandas.DataFrame<dataframe>`, optional
Parameter used to obtain time series of active power of (cumulative)
loads.
Possible options are:
* 'demandlib'
Time series are generated using the oemof demandlib.
* :pandas:`pandas.DataFrame<dataframe>`
DataFrame with load time series of each (cumulative) type of load
normalized with corresponding annual energy demand.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
Default: None.
timeseries_load_reactive_power : :pandas:`pandas.DataFrame<dataframe>`, optional
Parameter to get the time series of the reactive power of loads. It should be a
DataFrame with time series of normalized reactive power (normalized by
annual energy demand) per load sector. Index needs to be a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
Default: None.
timeindex : :pandas:`pandas.DatetimeIndex<datetimeindex>`
Can be used to define a time range for which to obtain load time series
and feed-in time series of fluctuating renewables or to define time
ranges of the given time series that will be used in the analysis.
"""
def __init__(self, network, **kwargs):
self.timeseries = TimeSeries(network=network)
mode = kwargs.get('mode', None)
config_data = network.config
weather_cell_ids = network.mv_grid.weather_cells
if mode:
if mode == 'worst-case':
modes = ['feedin_case', 'load_case']
elif mode == 'worst-case-feedin' or mode == 'worst-case-load':
modes = ['{}_case'.format(mode.split('-')[-1])]
else:
raise ValueError('{} is not a valid mode.'.format(mode))
# set random timeindex
self.timeseries._timeindex = pd.date_range(
'1/1/1970', periods=len(modes), freq='H')
self._worst_case_generation(config_data['worst_case_scale_factor'],
modes)
self._worst_case_load(config_data['worst_case_scale_factor'],
config_data['peakload_consumption_ratio'],
modes)
else:
# feed-in time series of fluctuating renewables
ts = kwargs.get('timeseries_generation_fluctuating', None)
if isinstance(ts, pd.DataFrame):
self.timeseries.generation_fluctuating = ts
elif isinstance(ts, str) and ts == 'oedb':
self.timeseries.generation_fluctuating = \
import_feedin_timeseries(config_data,
weather_cell_ids)
else:
raise ValueError('Your input for '
'"timeseries_generation_fluctuating" is not '
'valid.'.format(mode))
# feed-in time series for dispatchable generators
ts = kwargs.get('timeseries_generation_dispatchable', None)
if isinstance(ts, pd.DataFrame):
self.timeseries.generation_dispatchable = ts
else:
# check if there are any dispatchable generators, and
# throw error if there are
gens = network.mv_grid.generators + [
gen for lv_grid in network.mv_grid.lv_grids
for gen in lv_grid.generators]
if False in [True if isinstance(g, GeneratorFluctuating)
else False for g in gens]:
raise ValueError(
'Your input for "timeseries_generation_dispatchable" '
'is not valid.'.format(mode))
# reactive power time series for all generators
ts = kwargs.get('timeseries_generation_reactive_power', None)
if isinstance(ts, pd.DataFrame):
self.timeseries.generation_reactive_power = ts
# set time index
if kwargs.get('timeindex', None) is not None:
self.timeseries._timeindex = kwargs.get('timeindex')
else:
self.timeseries._timeindex = \
self.timeseries._generation_fluctuating.index
# load time series
ts = kwargs.get('timeseries_load', None)
if isinstance(ts, pd.DataFrame):
self.timeseries.load = ts
elif ts == 'demandlib':
self.timeseries.load = import_load_timeseries(
config_data, ts, year=self.timeseries.timeindex[0].year)
else:
raise ValueError('Your input for "timeseries_load" is not '
'valid.'.format(mode))
# reactive power timeseries for loads
ts = kwargs.get('timeseries_load_reactive_power', None)
if isinstance(ts, pd.DataFrame):
self.timeseries.load_reactive_power = ts
# check if time series for the set time index can be obtained
self._check_timeindex()
def _check_timeindex(self):
"""
Check function to check if all feed-in and load time series contain
values for the specified time index.
"""
try:
self.timeseries.generation_fluctuating
self.timeseries.generation_dispatchable
self.timeseries.load
self.timeseries.generation_reactive_power
self.timeseries.load_reactive_power
except:
message = 'Time index of feed-in and load time series does ' \
'not match.'
logging.error(message)
raise KeyError(message)
def _worst_case_generation(self, worst_case_scale_factors, modes):
"""
Define worst case generation time series for fluctuating and
dispatchable generators.
Parameters
----------
worst_case_scale_factors : dict
Scale factors defined in config file 'config_timeseries.cfg'.
Scale factors describe actual power to nominal power ratio of in
worst-case scenarios.
modes : list
List with worst-cases to generate time series for. Can be
'feedin_case', 'load_case' or both.
"""
self.timeseries.generation_fluctuating = pd.DataFrame(
{'solar': [worst_case_scale_factors[
'{}_feedin_pv'.format(mode)] for mode in modes],
'wind': [worst_case_scale_factors[
'{}_feedin_other'.format(mode)] for mode in modes]},
index=self.timeseries.timeindex)
self.timeseries.generation_dispatchable = pd.DataFrame(
{'other': [worst_case_scale_factors[
'{}_feedin_other'.format(mode)] for mode in modes]},
index=self.timeseries.timeindex)
def _worst_case_load(self, worst_case_scale_factors,
peakload_consumption_ratio, modes):
"""
Define worst case load time series for each sector.
Parameters
----------
worst_case_scale_factors : dict
Scale factors defined in config file 'config_timeseries.cfg'.
Scale factors describe actual power to nominal power ratio of in
worst-case scenarios.
peakload_consumption_ratio : dict
Ratios of peak load to annual consumption per sector, defined in
config file 'config_timeseries.cfg'
modes : list
List with worst-cases to generate time series for. Can be
'feedin_case', 'load_case' or both.
"""
sectors = ['residential', 'retail', 'industrial', 'agricultural']
lv_power_scaling = np.array(
[worst_case_scale_factors['lv_{}_load'.format(mode)]
for mode in modes])
mv_power_scaling = np.array(
[worst_case_scale_factors['mv_{}_load'.format(mode)]
for mode in modes])
lv = {(sector, 'lv'): peakload_consumption_ratio[sector] *
lv_power_scaling
for sector in sectors}
mv = {(sector, 'mv'): peakload_consumption_ratio[sector] *
mv_power_scaling
for sector in sectors}
self.timeseries.load = pd.DataFrame({**lv, **mv},
index=self.timeseries.timeindex)
[docs]class CurtailmentControl:
"""
Allocates given curtailment targets to solar and wind generators.
Parameters
----------
edisgo: :class:`edisgo.EDisGo`
The parent EDisGo object that this instance is a part of.
methodology : :obj:`str`
Defines the curtailment strategy. Possible options are:
* 'feedin-proportional'
The curtailment that has to be met in each time step is allocated
equally to all generators depending on their share of total
feed-in in that time step. For more information see
:func:`edisgo.flex_opt.curtailment.feedin_proportional`.
* 'voltage-based'
The curtailment that has to be met in each time step is allocated
based on the voltages at the generator connection points and a
defined voltage threshold. Generators at higher voltages
are curtailed more. The default voltage threshold is 1.0 but
can be changed by providing the argument 'voltage_threshold'. This
method formulates the allocation of curtailment as a linear
optimization problem using :py:mod:`Pyomo` and requires a linear
programming solver like coin-or cbc (cbc) or gnu linear programming
kit (glpk). The solver can be specified through the parameter
'solver'. For more information see
:func:`edisgo.flex_opt.curtailment.voltage_based`.
curtailment_timeseries : :pandas:`pandas.Series<series>` or :pandas:`pandas.DataFrame<dataframe>`, optional
Series or DataFrame containing the curtailment time series in kW. Index
needs to be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Provide a Series if the curtailment time series applies to wind and
solar generators. Provide a DataFrame if the curtailment time series
applies to a specific technology and optionally weather cell. In the
first case columns of the DataFrame are e.g. 'solar' and 'wind'; in the
second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level containing
the type and the second level the weather cell ID. Default: None.
solver: :obj:`str`
The solver used to optimize the curtailment assigned to the generators
when 'voltage-based' curtailment methodology is chosen.
Possible options are:
* 'cbc'
* 'glpk'
* any other available solver compatible with 'pyomo' such as 'gurobi'
or 'cplex'
Default: 'cbc'.
voltage_threshold : :obj:`float`
Voltage below which no curtailment is assigned to the respective
generator if not necessary when 'voltage-based' curtailment methodology
is chosen. See :func:`edisgo.flex_opt.curtailment.voltage_based` for
more information. Default: 1.0.
"""
def __init__(self, edisgo, methodology, curtailment_timeseries, **kwargs):
logging.info("Start curtailment methodology {}.".format(methodology))
self._check_timeindex(curtailment_timeseries, edisgo.network)
if methodology == 'feedin-proportional':
curtailment_method = curtailment.feedin_proportional
elif methodology == 'voltage-based':
curtailment_method = curtailment.voltage_based
else:
raise ValueError(
'{} is not a valid curtailment methodology.'.format(
methodology))
# get all fluctuating generators and their attributes (weather ID,
# type, etc.)
generators = get_gen_info(edisgo.network, 'mvlv', fluctuating=True)
# do analyze to get all voltages at generators and feed-in dataframe
edisgo.analyze()
# get feed-in time series of all generators
feedin = edisgo.network.pypsa.generators_t.p * 1000
# drop dispatchable generators and slack generator
drop_labels = [_ for _ in feedin.columns
if 'GeneratorFluctuating' not in _] \
+ ['Generator_slack']
feedin.drop(labels=drop_labels, axis=1, inplace=True)
if isinstance(curtailment_timeseries, pd.Series):
# check if curtailment exceeds feed-in
self._precheck(curtailment_timeseries, feedin,
'all_fluctuating_generators')
# do curtailment
curtailment_method(
feedin, generators, curtailment_timeseries, edisgo,
'all_fluctuating_generators', **kwargs)
elif isinstance(curtailment_timeseries, pd.DataFrame):
for col in curtailment_timeseries.columns:
logging.debug('Calculating curtailment for {}'.format(col))
# filter generators
if isinstance(curtailment_timeseries.columns, pd.MultiIndex):
selected_generators = generators.loc[
(generators.type == col[0]) &
(generators.weather_cell_id == col[1])]
else:
selected_generators = generators.loc[
(generators.type == col)]
# check if curtailment exceeds feed-in
feedin_selected_generators = \
feedin.loc[:, selected_generators.gen_repr.values]
self._precheck(curtailment_timeseries.loc[:, col],
feedin_selected_generators, col)
# do curtailment
if not feedin_selected_generators.empty:
curtailment_method(
feedin_selected_generators, selected_generators,
curtailment_timeseries.loc[:, col], edisgo,
col, **kwargs)
# check if curtailment exceeds feed-in
self._postcheck(edisgo.network, feedin)
# update generator time series in pypsa network
if edisgo.network.pypsa is not None:
pypsa_io.update_pypsa_generator_timeseries(edisgo.network)
# add measure to Results object
edisgo.network.results.measures = 'curtailment'
def _check_timeindex(self, curtailment_timeseries, network):
"""
Raises an error if time index of curtailment time series does not
comply with the time index of load and feed-in time series.
Parameters
-----------
curtailment_timeseries : :pandas:`pandas.Series<series>` or \
:pandas:`pandas.DataFrame<dataframe>`
See parameter `curtailment_timeseries` in class definition for more
information.
"""
if curtailment_timeseries is None:
message = 'No curtailment given.'
logging.error(message)
raise KeyError(message)
try:
curtailment_timeseries.loc[network.timeseries.timeindex]
except:
message = 'Time index of curtailment time series does not match ' \
'with load and feed-in time series.'
logging.error(message)
raise KeyError(message)
def _precheck(self, curtailment_timeseries, feedin_df, curtailment_key):
"""
Raises an error if the curtailment at any time step exceeds the
total feed-in of all generators curtailment can be distributed among
at that time.
Parameters
-----------
curtailment_timeseries : :pandas:`pandas.Series<series>`
Curtailment time series in kW for the technology (and weather
cell) specified in `curtailment_key`.
feedin_df : :pandas:`pandas.Series<series>`
Feed-in time series in kW for all generators of type (and in
weather cell) specified in `curtailment_key`.
curtailment_key : :obj:`str` or :obj:`tuple` with :obj:`str`
Technology (and weather cell) curtailment is given for.
"""
if not feedin_df.empty:
feedin_selected_sum = feedin_df.sum(axis=1)
diff = feedin_selected_sum - curtailment_timeseries
# add tolerance (set small negative values to zero)
diff[diff.between(-1, 0)] = 0
if not (diff >= 0).all():
bad_time_steps = [_ for _ in diff.index
if diff[_] < 0]
message = 'Curtailment demand exceeds total feed-in in time ' \
'steps {}.'.format(bad_time_steps)
logging.error(message)
raise ValueError(message)
else:
bad_time_steps = [_ for _ in curtailment_timeseries.index
if curtailment_timeseries[_] > 0]
if bad_time_steps:
message = 'Curtailment given for time steps {} but there ' \
'are no generators to meet the curtailment target ' \
'for {}.'.format(bad_time_steps, curtailment_key)
logging.error(message)
raise ValueError(message)
def _postcheck(self, network, feedin):
"""
Raises an error if the curtailment of a generator exceeds the
feed-in of that generator at any time step.
Parameters
-----------
network : :class:`~.grid.network.Network`
feedin : :pandas:`pandas.DataFrame<dataframe>`
DataFrame with feed-in time series in kW. Columns of the dataframe
are :class:`~.grid.components.GeneratorFluctuating`, index is
time index.
"""
curtailment = network.timeseries.curtailment
gen_repr = [repr(_) for _ in curtailment.columns]
feedin_repr = feedin.loc[:, gen_repr]
curtailment_repr = curtailment
curtailment_repr.columns = gen_repr
if not ((feedin_repr - curtailment_repr) > -1e-1).all().all():
message = 'Curtailment exceeds feed-in.'
logging.error(message)
raise TypeError(message)
[docs]class StorageControl:
"""
Integrates storages into the grid.
Parameters
----------
edisgo : :class:`~.grid.network.EDisGo`
timeseries : :obj:`str` or :pandas:`pandas.Series<series>` or :obj:`dict`
Parameter used to obtain time series of active power the
storage(s) is/are charged (negative) or discharged (positive) with. Can
either be a given time series or an operation strategy.
Possible options are:
* :pandas:`pandas.Series<series>`
Time series the storage will be charged and discharged with can be
set directly by providing a :pandas:`pandas.Series<series>` with
time series of active charge (negative) and discharge (positive)
power in kW. Index needs to be a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
If no nominal power for the storage is provided in
`parameters` parameter, the maximum of the time series is
used as nominal power.
In case of more than one storage provide a :obj:`dict` where each
entry represents a storage. Keys of the dictionary have to match
the keys of the `parameters dictionary`, values must
contain the corresponding time series as
:pandas:`pandas.Series<series>`.
* 'fifty-fifty'
Storage operation depends on actual power of generators. If
cumulative generation exceeds 50% of the nominal power, the storage
will charge. Otherwise, the storage will discharge.
If you choose this option you have to provide a nominal power for
the storage. See `parameters` for more information.
Default: None.
position : None or :obj:`str` or :class:`~.grid.components.Station` or :class:`~.grid.components.BranchTee` or :class:`~.grid.components.Generator` or :class:`~.grid.components.Load` or :obj:`dict`
To position the storage a positioning strategy can be used or a
node in the grid can be directly specified. Possible options are:
* 'hvmv_substation_busbar'
Places a storage unit directly at the HV/MV station's bus bar.
* :class:`~.grid.components.Station` or :class:`~.grid.components.BranchTee` or :class:`~.grid.components.Generator` or :class:`~.grid.components.Load`
Specifies a node the storage should be connected to. In the case
this parameter is of type :class:`~.grid.components.LVStation` an
additional parameter, `voltage_level`, has to be provided to define
which side of the LV station the storage is connected to.
* 'distribute_storages_mv'
Places one storage in each MV feeder if it reduces grid expansion
costs. This method needs a given time series of active power.
ToDo: Elaborate
In case of more than one storage provide a :obj:`dict` where each
entry represents a storage. Keys of the dictionary have to match
the keys of the `timeseries` and `parameters`
dictionaries, values must contain the corresponding positioning
strategy or node to connect the storage to.
parameters : :obj:`dict`, optional
Dictionary with the following optional storage parameters:
.. code-block:: python
{
'nominal_power': <float>, # in kW
'max_hours': <float>, # in h
'soc_initial': <float>, # in kWh
'efficiency_in': <float>, # in per unit 0..1
'efficiency_out': <float>, # in per unit 0..1
'standing_loss': <float> # in per unit 0..1
}
See :class:`~.grid.components.Storage` for more information on storage
parameters.
In case of more than one storage provide a :obj:`dict` where each
entry represents a storage. Keys of the dictionary have to match
the keys of the `timeseries` dictionary, values must
contain the corresponding parameters dictionary specified above.
Note: As edisgo currently only provides a power flow analysis storage
parameters don't have any effect on the calculations, except of the
nominal power of the storage.
Default: {}.
voltage_level : :obj:`str` or :obj:`dict`, optional
This parameter only needs to be provided if any entry in `position` is
of type :class:`~.grid.components.LVStation`. In that case
`voltage_level` defines which side of the LV station the storage is
connected to. Valid options are 'lv' and 'mv'.
In case of more than one storage provide a :obj:`dict` specifying the
voltage level for each storage that is to be connected to an LV
station. Keys of the dictionary have to match the keys of the
`timeseries` dictionary, values must contain the corresponding
voltage level.
Default: None.
timeseries_reactive_power : :pandas:`pandas.Series<series>` or :obj:`dict`
By default reactive power is set through the config file
`config_timeseries` in sections `reactive_power_factor` specifying
the power factor and `reactive_power_mode` specifying if inductive
or capacitive reactive power is provided.
If you want to over-write this behavior you can provide a reactive
power time series in kvar here. Be aware that eDisGo uses the generator
sign convention for storages (see `Definitions and units` section of
the documentation for more information). Index of the series needs to
be a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
In case of more than one storage provide a :obj:`dict` where each
entry represents a storage. Keys of the dictionary have to match
the keys of the `timeseries` dictionary, values must contain the
corresponding time series as :pandas:`pandas.Series<series>`.
"""
def __init__(self, edisgo, timeseries, position, **kwargs):
self.edisgo = edisgo
voltage_level = kwargs.pop('voltage_level', None)
parameters = kwargs.pop('parameters', {})
timeseries_reactive_power = kwargs.pop(
'timeseries_reactive_power', None)
if isinstance(timeseries, dict):
# check if other parameters are dicts as well if provided
if voltage_level is not None:
if not isinstance(voltage_level, dict):
message = 'Since storage `timeseries` is a dictionary, ' \
'`voltage_level` has to be provided as a ' \
'dictionary as well.'
logging.error(message)
raise KeyError(message)
if parameters is not None:
if not all(isinstance(value, dict) == True
for value in parameters.values()):
message = 'Since storage `timeseries` is a dictionary, ' \
'storage parameters of each storage have to ' \
'be provided as a dictionary as well.'
logging.error(message)
raise KeyError(message)
if timeseries_reactive_power is not None:
if not isinstance(timeseries_reactive_power, dict):
message = 'Since storage `timeseries` is a dictionary, ' \
'`timeseries_reactive_power` has to be ' \
'provided as a dictionary as well.'
logging.error(message)
raise KeyError(message)
for storage, ts in timeseries.items():
try:
pos = position[storage]
except KeyError:
message = 'Please provide position for storage {}.'.format(
storage)
logging.error(message)
raise KeyError(message)
try:
voltage_lev = voltage_level[storage]
except:
voltage_lev = None
try:
params = parameters[storage]
except:
params = {}
try:
reactive_power = timeseries_reactive_power[storage]
except:
reactive_power = None
self._integrate_storage(ts, pos, params,
voltage_lev, reactive_power, **kwargs)
else:
self._integrate_storage(timeseries, position, parameters,
voltage_level, timeseries_reactive_power,
**kwargs)
# add measure to Results object
self.edisgo.network.results.measures = 'storage_integration'
def _integrate_storage(self, timeseries, position, params, voltage_level,
reactive_power_timeseries, **kwargs):
"""
Integrate storage units in the grid.
Parameters
----------
timeseries : :obj:`str` or :pandas:`pandas.Series<series>`
Parameter used to obtain time series of active power the storage
storage is charged (negative) or discharged (positive) with. Can
either be a given time series or an operation strategy. See class
definition for more information
position : :obj:`str` or :class:`~.grid.components.Station` or :class:`~.grid.components.BranchTee` or :class:`~.grid.components.Generator` or :class:`~.grid.components.Load`
Parameter used to place the storage. See class definition for more
information.
params : :obj:`dict`
Dictionary with storage parameters for one storage. See class
definition for more information on what parameters must be
provided.
voltage_level : :obj:`str` or None
`voltage_level` defines which side of the LV station the storage is
connected to. Valid options are 'lv' and 'mv'. Default: None. See
class definition for more information.
reactive_power_timeseries : :pandas:`pandas.Series<series>` or None
Reactive power time series in kvar (generator sign convention).
Index of the series needs to be a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
"""
# place storage
params = self._check_nominal_power(params, timeseries)
if isinstance(position, Station) or isinstance(position, BranchTee) \
or isinstance(position, Generator) \
or isinstance(position, Load):
storage = storage_integration.set_up_storage(
node=position, parameters=params, voltage_level=voltage_level)
line = storage_integration.connect_storage(storage, position)
elif isinstance(position, str) \
and position == 'hvmv_substation_busbar':
storage, line = storage_integration.storage_at_hvmv_substation(
self.edisgo.network.mv_grid, params)
elif isinstance(position, str) \
and position == 'distribute_storages_mv':
# check active power time series
if not isinstance(timeseries, pd.Series):
raise ValueError(
"Storage time series needs to be a pandas Series if "
"`position` is 'distribute_storages_mv'.")
else:
timeseries = pd.DataFrame(data={'p': timeseries},
index=timeseries.index)
self._check_timeindex(timeseries)
# check reactive power time series
if reactive_power_timeseries is not None:
self._check_timeindex(reactive_power_timeseries)
timeseries['q'] = reactive_power_timeseries.loc[
timeseries.index]
else:
timeseries['q'] = 0
# start storage positioning method
storage_positioning.one_storage_per_feeder(
edisgo=self.edisgo, storage_timeseries=timeseries,
storage_nominal_power=params['nominal_power'], **kwargs)
return
else:
message = 'Provided storage position option {} is not ' \
'valid.'.format(timeseries)
logging.error(message)
raise KeyError(message)
# implement operation strategy (active power)
if isinstance(timeseries, pd.Series):
timeseries = pd.DataFrame(data={'p': timeseries},
index=timeseries.index)
self._check_timeindex(timeseries)
storage.timeseries = timeseries
elif isinstance(timeseries, str) and timeseries == 'fifty-fifty':
storage_operation.fifty_fifty(self.edisgo.network, storage)
else:
message = 'Provided storage timeseries option {} is not ' \
'valid.'.format(timeseries)
logging.error(message)
raise KeyError(message)
# reactive power
if reactive_power_timeseries is not None:
self._check_timeindex(reactive_power_timeseries)
storage.timeseries = pd.DataFrame(
{'p': storage.timeseries.p,
'q': reactive_power_timeseries.loc[storage.timeseries.index]},
index=storage.timeseries.index)
# update pypsa representation
if self.edisgo.network.pypsa is not None:
pypsa_io.update_pypsa_storage(
self.edisgo.network.pypsa,
storages=[storage], storages_lines=[line])
def _check_nominal_power(self, storage_parameters, timeseries):
"""
Tries to assign a nominal power to the storage.
Checks if nominal power is provided through `storage_parameters`,
otherwise tries to return the absolute maximum of `timeseries`. Raises
an error if it cannot assign a nominal power.
Parameters
----------
timeseries : :obj:`str` or :pandas:`pandas.Series<series>`
See parameter `timeseries` in class definition for more
information.
storage_parameters : :obj:`dict`
See parameter `parameters` in class definition for more
information.
Returns
--------
:obj:`dict`
The given `storage_parameters` is returned extended by an entry for
'nominal_power', if it didn't already have that key.
"""
if storage_parameters.get('nominal_power', None) is None:
try:
storage_parameters['nominal_power'] = max(abs(timeseries))
except:
raise ValueError("Could not assign a nominal power to the "
"storage. Please provide either a nominal "
"power or an active power time series.")
return storage_parameters
def _check_timeindex(self, timeseries):
"""
Raises an error if time index of storage time series does not
comply with the time index of load and feed-in time series.
Parameters
-----------
timeseries : :pandas:`pandas.DataFrame<dataframe>`
DataFrame containing active power the storage is charged (negative)
and discharged (positive) with in kW in column 'p' and
reactive power in kVA in column 'q'.
"""
try:
timeseries.loc[self.edisgo.network.timeseries.timeindex]
except:
message = 'Time index of storage time series does not match ' \
'with load and feed-in time series.'
logging.error(message)
raise KeyError(message)
[docs]class TimeSeries:
"""
Defines time series for all loads and generators in network (if set).
Contains time series for loads (sector-specific), generators
(technology-specific), and curtailment (technology-specific).
Attributes
----------
generation_fluctuating : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with active power feed-in time series for fluctuating
renewables solar and wind, normalized with corresponding capacity.
Time series can either be aggregated by technology type or by type
and weather cell ID. In the first case columns of the DataFrame are
'solar' and 'wind'; in the second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level
containing the type and the second level the weather cell ID.
Default: None.
generation_dispatchable : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series for active power of each (aggregated)
type of dispatchable generator normalized with corresponding capacity.
Columns represent generator type:
* 'gas'
* 'coal'
* 'biomass'
* 'other'
* ...
Use 'other' if you don't want to explicitly provide every possible
type. Default: None.
generation_reactive_power : :pandas: `pandasDataFrame<dataframe>`, optional
DataFrame with reactive power per technology and weather cell ID,
normalized with the nominal active power.
Time series can either be aggregated by technology type or by type
and weather cell ID. In the first case columns of the DataFrame are
'solar' and 'wind'; in the second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level
containing the type and the second level the weather cell ID.
If the technology doesn't contain weather cell information, i.e.
if it is other than solar or wind generation,
this second level can be left as a numpy Nan or a None.
Default: None.
load : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with active power of load time series of each (cumulative)
type of load, normalized with corresponding annual energy demand.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
Default: None.
load_reactive_power : :pandas:`pandas.DataFrame<dataframe>`, optional
DataFrame with time series of normalized reactive power (normalized by
annual energy demand) per load sector. Index needs to be a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
Columns represent load type:
* 'residential'
* 'retail'
* 'industrial'
* 'agricultural'
Default: None.
curtailment : :pandas:`pandas.DataFrame<dataframe>` or List, optional
In the case curtailment is applied to all fluctuating renewables
this needs to be a DataFrame with active power curtailment time series.
Time series can either be aggregated by technology type or by type
and weather cell ID. In the first case columns of the DataFrame are
'solar' and 'wind'; in the second case columns need to be a
:pandas:`pandas.MultiIndex<multiindex>` with the first level
containing the type and the second level the weather cell ID.
In the case curtailment is only applied to specific generators, this
parameter needs to be a list of all generators that are curtailed.
Default: None.
timeindex : :pandas:`pandas.DatetimeIndex<datetimeindex>`, optional
Can be used to define a time range for which to obtain the provided
time series and run power flow analysis. Default: None.
See also
--------
`timeseries` getter in :class:`~.grid.components.Generator`,
:class:`~.grid.components.GeneratorFluctuating` and
:class:`~.grid.components.Load`.
"""
def __init__(self, network, **kwargs):
self.network = network
self._generation_dispatchable = kwargs.get('generation_dispatchable',
None)
self._generation_fluctuating = kwargs.get('generation_fluctuating',
None)
self._generation_reactive_power = kwargs.get(
'generation_reactive_power', None)
self._load = kwargs.get('load', None)
self._load_reactive_power = kwargs.get('load_reacitve_power', None)
self._curtailment = kwargs.get('curtailment', None)
self._timeindex = kwargs.get('timeindex', None)
self._timesteps_load_feedin_case = None
@property
def generation_dispatchable(self):
"""
Get generation time series of dispatchable generators (only active
power)
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
See class definition for details.
"""
try:
return self._generation_dispatchable.loc[[self.timeindex], :]
except:
return self._generation_dispatchable.loc[self.timeindex, :]
@generation_dispatchable.setter
def generation_dispatchable(self, generation_dispatchable_timeseries):
self._generation_dispatchable = generation_dispatchable_timeseries
@property
def generation_fluctuating(self):
"""
Get generation time series of fluctuating renewables (only active
power)
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
See class definition for details.
"""
try:
return self._generation_fluctuating.loc[[self.timeindex], :]
except:
return self._generation_fluctuating.loc[self.timeindex, :]
@generation_fluctuating.setter
def generation_fluctuating(self, generation_fluc_timeseries):
self._generation_fluctuating = generation_fluc_timeseries
@property
def generation_reactive_power(self):
"""
Get reactive power time series for generators normalized by nominal
active power.
Returns
-------
:pandas: `pandas.DataFrame<dataframe>`
See class definition for details.
"""
if self._generation_reactive_power is not None:
return self._generation_reactive_power.loc[self.timeindex, :]
else:
return None
@generation_reactive_power.setter
def generation_reactive_power(self, generation_reactive_power_timeseries):
self._generation_reactive_power = generation_reactive_power_timeseries
@property
def load(self):
"""
Get load time series (only active power)
Returns
-------
dict or :pandas:`pandas.DataFrame<dataframe>`
See class definition for details.
"""
try:
return self._load.loc[[self.timeindex], :]
except:
return self._load.loc[self.timeindex, :]
@load.setter
def load(self, load_timeseries):
self._load = load_timeseries
@property
def load_reactive_power(self):
"""
Get reactive power time series for load normalized by annual
consumption.
Returns
-------
:pandas: `pandas.DataFrame<dataframe>`
See class definition for details.
"""
if self._load_reactive_power is not None:
return self._load_reactive_power.loc[self.timeindex, :]
else:
return None
@load_reactive_power.setter
def load_reactive_power(self, load_reactive_power_timeseries):
self._load_reactive_power = load_reactive_power_timeseries
@property
def timeindex(self):
"""
Parameters
----------
time_range : :pandas:`pandas.DatetimeIndex<datetimeindex>`
Time range of power flow analysis
Returns
-------
:pandas:`pandas.DatetimeIndex<datetimeindex>`
See class definition for details.
"""
return self._timeindex
@property
def curtailment(self):
"""
Get curtailment time series of dispatchable generators (only active
power)
Parameters
----------
curtailment : list or :pandas:`pandas.DataFrame<dataframe>`
See class definition for details.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
In the case curtailment is applied to all solar and wind generators
curtailment time series either aggregated by technology type or by
type and weather cell ID are returnded. In the first case columns
of the DataFrame are 'solar' and 'wind'; in the second case columns
need to be a :pandas:`pandas.MultiIndex<multiindex>` with the
first level containing the type and the second level the weather
cell ID.
In the case curtailment is only applied to specific generators,
curtailment time series of all curtailed generators, specified in
by the column name are returned.
"""
if self._curtailment is not None:
if isinstance(self._curtailment, pd.DataFrame):
try:
return self._curtailment.loc[[self.timeindex], :]
except:
return self._curtailment.loc[self.timeindex, :]
elif isinstance(self._curtailment, list):
try:
curtailment = pd.DataFrame()
for gen in self._curtailment:
curtailment[gen] = gen.curtailment
return curtailment
except:
raise
else:
return None
@curtailment.setter
def curtailment(self, curtailment):
self._curtailment = curtailment
@property
def timesteps_load_feedin_case(self):
"""
Contains residual load and information on feed-in and load case.
Residual load is calculated from total (load - generation) in the grid.
Grid losses are not considered.
Feed-in and load case are identified based on the
generation and load time series and defined as follows:
1. Load case: positive (load - generation) at HV/MV substation
2. Feed-in case: negative (load - generation) at HV/MV substation
See also :func:`~.tools.tools.assign_load_feedin_case`.
Parameters
-----------
timeseries_load_feedin_case : :pandas:`pandas.DataFrame<dataframe>`
Dataframe with information on whether time step is handled as load
case ('load_case') or feed-in case ('feedin_case') for each time
step in :py:attr:`~timeindex`. Index of the series is the
:py:attr:`~timeindex`.
Returns
-------
:pandas:`pandas.Series<series>`
Series with information on whether time step is handled as load
case ('load_case') or feed-in case ('feedin_case') for each time
step in :py:attr:`~timeindex`.
Index of the dataframe is :py:attr:`~timeindex`. Columns of the
dataframe are 'residual_load' with (load - generation) in kW at
HV/MV substation and 'case' with 'load_case' for positive residual
load and 'feedin_case' for negative residual load.
"""
return tools.assign_load_feedin_case(self.network)
[docs]class Results:
"""
Power flow analysis results management
Includes raw power flow analysis results, history of measures to increase
the grid's hosting capacity and information about changes of equipment.
Attributes
----------
network : :class:`~.grid.network.Network`
The network is a container object holding all data.
"""
def __init__(self, network):
self.network = network
self._measures = ['original']
self._pfa_p = None
self._pfa_q = None
self._pfa_v_mag_pu = None
self._i_res = None
self._equipment_changes = pd.DataFrame()
self._grid_expansion_costs = None
self._grid_losses = None
self._hv_mv_exchanges = None
self._curtailment = None
self._storage_integration = None
self._unresolved_issues = {}
self._storages_costs_reduction = None
@property
def measures(self):
"""
List with the history of measures to increase grid's hosting capacity.
Parameters
----------
measure : :obj:`str`
Measure to increase grid's hosting capacity. Possible options are
'grid_expansion', 'storage_integration', 'curtailment'.
Returns
-------
measures : :obj:`list`
A stack that details the history of measures to increase grid's
hosting capacity. The last item refers to the latest measure. The
key `original` refers to the state of the grid topology as it was
initially imported.
"""
return self._measures
@measures.setter
def measures(self, measure):
self._measures.append(measure)
@property
def pfa_p(self):
"""
Active power results from power flow analysis in kW.
Holds power flow analysis results for active power for the last
iteration step. Index of the DataFrame is a DatetimeIndex indicating
the time period the power flow analysis was conducted for; columns
of the DataFrame are the edges as well as stations of the grid
topology.
Parameters
----------
pypsa : :pandas:`pandas.DataFrame<dataframe>`
Results time series of active power P in kW from the
`PyPSA network <https://www.pypsa.org/doc/components.html#network>`_
Provide this if you want to set values. For retrieval of data do
not pass an argument
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Active power results from power flow analysis
"""
return self._pfa_p
@pfa_p.setter
def pfa_p(self, pypsa):
self._pfa_p = pypsa
@property
def pfa_q(self):
"""
Reactive power results from power flow analysis in kvar.
Holds power flow analysis results for reactive power for the last
iteration step. Index of the DataFrame is a DatetimeIndex indicating
the time period the power flow analysis was conducted for; columns
of the DataFrame are the edges as well as stations of the grid
topology.
Parameters
----------
pypsa : :pandas:`pandas.DataFrame<dataframe>`
Results time series of reactive power Q in kvar from the
`PyPSA network <https://www.pypsa.org/doc/components.html#network>`_
Provide this if you want to set values. For retrieval of data do not
pass an argument
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Reactive power results from power flow analysis
"""
return self._pfa_q
@pfa_q.setter
def pfa_q(self, pypsa):
self._pfa_q = pypsa
@property
def pfa_v_mag_pu(self):
"""
Voltage deviation at node in p.u.
Holds power flow analysis results for relative voltage deviation for
the last iteration step. Index of the DataFrame is a DatetimeIndex
indicating the time period the power flow analysis was conducted for;
columns of the DataFrame are the nodes as well as stations of the grid
topology.
Parameters
----------
pypsa : :pandas:`pandas.DataFrame<dataframe>`
Results time series of voltage deviation in p.u. from the
`PyPSA network <https://www.pypsa.org/doc/components.html#network>`_
Provide this if you want to set values. For retrieval of data do
not pass an argument
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Voltage level nodes of grid
"""
return self._pfa_v_mag_pu
@pfa_v_mag_pu.setter
def pfa_v_mag_pu(self, pypsa):
self._pfa_v_mag_pu = pypsa
@property
def i_res(self):
"""
Current results from power flow analysis in A.
Holds power flow analysis results for current for the last
iteration step. Index of the DataFrame is a DatetimeIndex indicating
the time period the power flow analysis was conducted for; columns
of the DataFrame are the edges as well as stations of the grid
topology.
Parameters
----------
pypsa : :pandas:`pandas.DataFrame<dataframe>`
Results time series of current in A from the
`PyPSA network <https://www.pypsa.org/doc/components.html#network>`_
Provide this if you want to set values. For retrieval of data do
not pass an argument
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Current results from power flow analysis
"""
return self._i_res
@i_res.setter
def i_res(self, pypsa):
self._i_res = pypsa
@property
def equipment_changes(self):
"""
Tracks changes in the equipment (e.g. replaced or added cable, etc.)
The DataFrame is indexed by the component(
:class:`~.grid.components.Line`, :class:`~.grid.components.Station`,
etc.) and has the following columns:
equipment : detailing what was changed (line, station, storage,
curtailment). For ease of referencing we take the component itself.
For lines we take the line-dict, for stations the transformers, for
storages the storage-object itself and for curtailment
either a dict providing the details of curtailment or a curtailment
object if this makes more sense (has to be defined).
change : :obj:`str`
Specifies if something was added or removed.
iteration_step : :obj:`int`
Used for the update of the pypsa network to only consider changes
since the last power flow analysis.
quantity : :obj:`int`
Number of components added or removed. Only relevant for
calculation of grid expansion costs to keep track of how many
new standard lines were added.
Parameters
----------
changes : :pandas:`pandas.DataFrame<dataframe>`
Provide this if you want to set values. For retrieval of data do
not pass an argument.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Equipment changes
"""
return self._equipment_changes
@equipment_changes.setter
def equipment_changes(self, changes):
self._equipment_changes = changes
@property
def grid_expansion_costs(self):
"""
Holds grid expansion costs in kEUR due to grid expansion measures
tracked in self.equipment_changes and calculated in
edisgo.flex_opt.costs.grid_expansion_costs()
Parameters
----------
total_costs : :pandas:`pandas.DataFrame<dataframe>`
DataFrame containing type and costs plus in the case of lines the
line length and number of parallel lines of each reinforced
transformer and line. Provide this if you want to set
grid_expansion_costs. For retrieval of costs do not pass an
argument.
Index of the DataFrame is the respective object
that can either be a :class:`~.grid.components.Line` or a
:class:`~.grid.components.Transformer`. Columns are the following:
type : :obj:`str`
Transformer size or cable name
total_costs : :obj:`float`
Costs of equipment in kEUR. For lines the line length and
number of parallel lines is already included in the total
costs.
quantity : :obj:`int`
For transformers quantity is always one, for lines it specifies
the number of parallel lines.
line_length : :obj:`float`
Length of line or in case of parallel lines all lines in km.
voltage_level : :obj:`str`
Specifies voltage level the equipment is in ('lv', 'mv' or
'mv/lv').
mv_feeder : :class:`~.grid.components.Line`
First line segment of half-ring used to identify in which
feeder the grid expansion was conducted in.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Costs of each reinforced equipment in kEUR.
Notes
-------
Total grid expansion costs can be obtained through
costs.total_costs.sum().
"""
return self._grid_expansion_costs
@grid_expansion_costs.setter
def grid_expansion_costs(self, total_costs):
self._grid_expansion_costs = total_costs
@property
def grid_losses(self):
"""
Holds active and reactive grid losses in kW and kvar, respectively.
Parameters
----------
pypsa_grid_losses : :pandas:`pandas.DataFrame<dataframe>`
Dataframe holding active and reactive grid losses in columns 'p'
and 'q' and in kW and kvar, respectively. Index is a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe holding active and reactive grid losses in columns 'p'
and 'q' and in kW and kvar, respectively. Index is a
:pandas:`pandas.DatetimeIndex<datetimeindex>`.
Notes
------
Grid losses are calculated as follows:
.. math::
P_{loss} = \sum{feed-in} - \sum{load} + P_{slack}
Q_{loss} = \sum{feed-in} - \sum{load} + Q_{slack}
As the slack is placed at the secondary side of the HV/MV station
losses do not include losses of the HV/MV transformers.
"""
return self._grid_losses
@grid_losses.setter
def grid_losses(self, pypsa_grid_losses):
self._grid_losses = pypsa_grid_losses
@property
def hv_mv_exchanges(self):
"""
Holds active and reactive power exchanged with the HV grid.
The exchanges are essentially the slack results. As the slack is placed
at the secondary side of the HV/MV station, this gives the energy
transferred to and taken from the HV grid at the secondary side of the
HV/MV station.
Parameters
----------
hv_mv_exchanges : :pandas:`pandas.DataFrame<dataframe>`
Dataframe holding active and reactive power exchanged with the HV
grid in columns 'p' and 'q' and in kW and kvar, respectively. Index
is a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>
Dataframe holding active and reactive power exchanged with the HV
grid in columns 'p' and 'q' and in kW and kvar, respectively. Index
is a :pandas:`pandas.DatetimeIndex<datetimeindex>`.
"""
return self._hv_mv_exchanges
@hv_mv_exchanges.setter
def hv_mv_exchanges(self, hv_mv_exchanges):
self._hv_mv_exchanges = hv_mv_exchanges
@property
def curtailment(self):
"""
Holds curtailment assigned to each generator per curtailment target.
Returns
-------
:obj:`dict` with :pandas:`pandas.DataFrame<dataframe>`
Keys of the dictionary are generator types (and weather cell ID)
curtailment targets were given for. E.g. if curtailment is provided
as a :pandas:`pandas.DataFrame<dataframe>` with
:pandas.`pandas.MultiIndex` columns with levels 'type' and
'weather cell ID' the dictionary key is a tuple of
('type','weather_cell_id').
Values of the dictionary are dataframes with the curtailed power in
kW per generator and time step. Index of the dataframe is a
:pandas:`pandas.DatetimeIndex<datetimeindex>`. Columns are the
generators of type
:class:`edisgo.grid.components.GeneratorFluctuating`.
"""
if self._curtailment is not None:
result_dict = {}
for key, gen_list in self._curtailment.items():
curtailment_df = pd.DataFrame()
for gen in gen_list:
curtailment_df[gen] = gen.curtailment
result_dict[key] = curtailment_df
return result_dict
else:
return None
@property
def storages(self):
"""
Gathers relevant storage results.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing all storages installed in the MV grid and
LV grids. Index of the dataframe are the storage representatives,
columns are the following:
nominal_power : :obj:`float`
Nominal power of the storage in kW.
voltage_level : :obj:`str`
Voltage level the storage is connected to. Can either be 'mv'
or 'lv'.
"""
grids = [self.network.mv_grid] + list(self.network.mv_grid.lv_grids)
storage_results = {}
storage_results['storage_id'] = []
storage_results['nominal_power'] = []
storage_results['voltage_level'] = []
storage_results['grid_connection_point'] = []
for grid in grids:
for storage in grid.graph.nodes_by_attribute('storage'):
storage_results['storage_id'].append(repr(storage))
storage_results['nominal_power'].append(storage.nominal_power)
storage_results['voltage_level'].append(
'mv' if isinstance(grid, MVGrid) else 'lv')
storage_results['grid_connection_point'].append(
list(grid.graph.neighbors(storage))[0])
return pd.DataFrame(storage_results).set_index('storage_id')
[docs] def storages_timeseries(self):
"""
Returns a dataframe with storage time series.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing time series of all storages installed in the
MV grid and LV grids. Index of the dataframe is a
:pandas:`pandas.DatetimeIndex<datetimeindex>`. Columns are the
storage representatives.
"""
storages_p = pd.DataFrame()
storages_q = pd.DataFrame()
grids = [self.network.mv_grid] + list(self.network.mv_grid.lv_grids)
for grid in grids:
for storage in grid.graph.nodes_by_attribute('storage'):
ts = storage.timeseries
storages_p[repr(storage)] = ts.p
storages_q[repr(storage)] = ts.q
return storages_p, storages_q
@property
def storages_costs_reduction(self):
"""
Contains costs reduction due to storage integration.
Parameters
----------
costs_df : :pandas:`pandas.DataFrame<dataframe>`
Dataframe containing grid expansion costs in kEUR before and after
storage integration in columns 'grid_expansion_costs_initial' and
'grid_expansion_costs_with_storages', respectively. Index of the
dataframe is the MV grid id.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing grid expansion costs in kEUR before and after
storage integration in columns 'grid_expansion_costs_initial' and
'grid_expansion_costs_with_storages', respectively. Index of the
dataframe is the MV grid id.
"""
return self._storages_costs_reduction
@storages_costs_reduction.setter
def storages_costs_reduction(self, costs_df):
self._storages_costs_reduction = costs_df
@property
def unresolved_issues(self):
"""
Holds lines and nodes where over-loading or over-voltage issues
could not be solved in grid reinforcement.
In case over-loading or over-voltage issues could not be solved
after maximum number of iterations, grid reinforcement is not
aborted but grid expansion costs are still calculated and unresolved
issues listed here.
Parameters
----------
issues : dict
Dictionary of critical lines/stations with relative over-loading
and critical nodes with voltage deviation in p.u.. Format:
.. code-block:: python
{crit_line_1: rel_overloading_1, ...,
crit_line_n: rel_overloading_n,
crit_node_1: v_mag_pu_node_1, ...,
crit_node_n: v_mag_pu_node_n}
Provide this if you want to set unresolved_issues. For retrieval
of unresolved issues do not pass an argument.
Returns
-------
Dictionary
Dictionary of critical lines/stations with relative over-loading
and critical nodes with voltage deviation in p.u.
"""
return self._unresolved_issues
@unresolved_issues.setter
def unresolved_issues(self, issues):
self._unresolved_issues = issues
[docs] def s_res(self, components=None):
"""
Get resulting apparent power in kVA at line(s) and transformer(s).
The apparent power at a line (or transformer) is determined from the
maximum values of active power P and reactive power Q.
.. math::
S = max(\sqrt{p_0^2 + q_0^2}, \sqrt{p_1^2 + q_1^2})
Parameters
----------
components : :obj:`list`
List with all components (of type :class:`~.grid.components.Line`
or :class:`~.grid.components.Transformer`) to get apparent power
for. If not provided defaults to return apparent power of all lines
and transformers in the grid.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Apparent power in kVA for lines and/or transformers.
"""
if components is not None:
labels_included = []
labels_not_included = []
labels = [repr(l) for l in components]
for label in labels:
if (label in list(self.pfa_p.columns) and
label in list(self.pfa_q.columns)):
labels_included.append(label)
else:
labels_not_included.append(label)
if labels_not_included:
logging.warning(
"Apparent power for {lines} are not returned from "
"PFA".format(lines=labels_not_included))
else:
labels_included = self.pfa_p.columns
s_res = ((self.pfa_p[labels_included] ** 2 + self.pfa_q[
labels_included] ** 2)).applymap(sqrt)
return s_res
[docs] def v_res(self, nodes=None, level=None):
"""
Get voltage results (in p.u.) from power flow analysis.
Parameters
----------
nodes : :class:`~.grid.components.Load`, \
:class:`~.grid.components.Generator`, etc. or :obj:`list`
Grid topology component or list of grid topology components.
If not provided defaults to column names available in grid level
`level`.
level : str
Either 'mv' or 'lv' or None (default). Depending on which grid
level results you are interested in. It is required to provide this
argument in order to distinguish voltage levels at primary and
secondary side of the transformer/LV station.
If not provided (respectively None) defaults to ['mv', 'lv'].
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Resulting voltage levels obtained from power flow analysis
Notes
-----
Limitation: When power flow analysis is performed for MV only
(with aggregated LV loads and generators) this methods only returns
voltage at secondary side busbar and not at load/generator.
"""
# First check if results are available:
if hasattr(self, 'pfa_v_mag_pu'):
# unless index is lexsorted, it cannot be sliced
self.pfa_v_mag_pu.sort_index(axis=1, inplace=True)
else:
message = "No Power Flow Calculation has be done yet, " \
"so there are no results yet."
raise AttributeError(message)
if level is None:
level = ['mv', 'lv']
if nodes is None:
return self.pfa_v_mag_pu.loc[:, (level, slice(None))]
else:
labels = list(map(repr, list(nodes).copy()))
not_included = [_ for _ in labels
if _ not in list(self.pfa_v_mag_pu[level].columns)]
labels_included = [_ for _ in labels if _ not in not_included]
if not_included:
logging.warning("Voltage levels for {nodes} are not returned "
"from PFA".format(
nodes=not_included))
return self.pfa_v_mag_pu[level][labels_included]
[docs] def save(self, directory, parameters='all'):
"""
Saves results to disk.
Depending on which results are selected and if they exist, the
following directories and files are created:
* `powerflow_results` directory
* `voltages_pu.csv`
See :py:attr:`~pfa_v_mag_pu` for more information.
* `currents.csv`
See :func:`~i_res` for more information.
* `active_powers.csv`
See :py:attr:`~pfa_p` for more information.
* `reactive_powers.csv`
See :py:attr:`~pfa_q` for more information.
* `apparent_powers.csv`
See :func:`~s_res` for more information.
* `grid_losses.csv`
See :py:attr:`~grid_losses` for more information.
* `hv_mv_exchanges.csv`
See :py:attr:`~hv_mv_exchanges` for more information.
* `pypsa_network` directory
See :py:func:`pypsa.Network.export_to_csv_folder`
* `grid_expansion_results` directory
* `grid_expansion_costs.csv`
See :py:attr:`~grid_expansion_costs` for more information.
* `equipment_changes.csv`
See :py:attr:`~equipment_changes` for more information.
* `unresolved_issues.csv`
See :py:attr:`~unresolved_issues` for more information.
* `curtailment_results` directory
Files depend on curtailment specifications. There will be one file
for each curtailment specification, that is for every key in
:py:attr:`~curtailment` dictionary.
* `storage_integration_results` directory
* `storages.csv`
See :func:`~storages` for more information.
Parameters
----------
directory : :obj:`str`
Directory to save the results in.
parameters : :obj:`str` or :obj:`list` of :obj:`str`
Specifies which results will be saved. By default all results are
saved. To only save certain results set `parameters` to one of the
following options or choose several options by providing a list:
* 'pypsa_network'
* 'powerflow_results'
* 'grid_expansion_results'
* 'curtailment_results'
* 'storage_integration_results'
"""
def _save_power_flow_results(target_dir):
if self.pfa_v_mag_pu is not None:
# create directory
os.makedirs(target_dir, exist_ok=True)
# voltage
self.pfa_v_mag_pu.to_csv(
os.path.join(target_dir, 'voltages_pu.csv'))
# current
self.i_res.to_csv(
os.path.join(target_dir, 'currents.csv'))
# active power
self.pfa_p.to_csv(
os.path.join(target_dir, 'active_powers.csv'))
# reactive power
self.pfa_q.to_csv(
os.path.join(target_dir, 'reactive_powers.csv'))
# apparent power
self.s_res().to_csv(
os.path.join(target_dir, 'apparent_powers.csv'))
# grid losses
self.grid_losses.to_csv(
os.path.join(target_dir, 'grid_losses.csv'))
# grid exchanges
self.hv_mv_exchanges.to_csv(os.path.join(
target_dir, 'hv_mv_exchanges.csv'))
def _save_pypsa_network(target_dir):
if self.network.pypsa:
# create directory
os.makedirs(target_dir, exist_ok=True)
self.network.pypsa.export_to_csv_folder(target_dir)
def _save_grid_expansion_results(target_dir):
if self.grid_expansion_costs is not None:
# create directory
os.makedirs(target_dir, exist_ok=True)
# grid expansion costs
self.grid_expansion_costs.to_csv(os.path.join(
target_dir, 'grid_expansion_costs.csv'))
# unresolved issues
pd.DataFrame(self.unresolved_issues).to_csv(os.path.join(
target_dir, 'unresolved_issues.csv'))
# equipment changes
self.equipment_changes.to_csv(os.path.join(
target_dir, 'equipment_changes.csv'))
def _save_curtailment_results(target_dir):
if self.curtailment is not None:
# create directory
os.makedirs(target_dir, exist_ok=True)
for key, curtailment_df in self.curtailment.items():
if type(key) == tuple:
type_prefix = '-'.join([key[0], str(key[1])])
elif type(key) == str:
type_prefix = key
else:
raise KeyError("Unknown key type {} for key {}".format(
type(key), key))
filename = os.path.join(
target_dir, '{}.csv'.format(type_prefix))
curtailment_df.to_csv(filename, index_label=type_prefix)
def _save_storage_integration_results(target_dir):
storages = self.storages
if not storages.empty:
# create directory
os.makedirs(target_dir, exist_ok=True)
# general storage information
storages.to_csv(os.path.join(target_dir, 'storages.csv'))
# storages time series
ts_p, ts_q = self.storages_timeseries()
ts_p.to_csv(os.path.join(
target_dir, 'storages_active_power.csv'))
ts_q.to_csv(os.path.join(
target_dir, 'storages_reactive_power.csv'))
if not self.storages_costs_reduction is None:
self.storages_costs_reduction.to_csv(
os.path.join(target_dir,
'storages_costs_reduction.csv'))
# dictionary with function to call to save each parameter
func_dict = {
'powerflow_results': _save_power_flow_results,
'pypsa_network': _save_pypsa_network,
'grid_expansion_results': _save_grid_expansion_results,
'curtailment_results': _save_curtailment_results,
'storage_integration_results': _save_storage_integration_results
}
# if string is given convert to list
if isinstance(parameters, str):
if parameters == 'all':
parameters = ['powerflow_results', 'pypsa_network',
'grid_expansion_results', 'curtailment_results',
'storage_integration_results']
else:
parameters = [parameters]
# save each parameter
for parameter in parameters:
try:
func_dict[parameter](os.path.join(directory, parameter))
except KeyError:
message = "Invalid input {} for `parameters` when saving " \
"results. Must be any or a list of the following: " \
"'pypsa_network', 'powerflow_results', " \
"'grid_expansion_results', 'curtailment_results', " \
"'storage_integration_results'.".format(parameter)
logger.error(message)
raise KeyError(message)
except:
raise
# save measures
pd.DataFrame(data={'measure': self.measures}).to_csv(
os.path.join(directory, 'measures.csv'))
# save configs
with open(os.path.join(directory, 'configs.csv'), 'w') as f:
writer = csv.writer(f)
rows = [
['{}'.format(key)] + [value for item in values.items()
for value in item]
for key, values in self.network.config._data.items()]
writer.writerows(rows)
[docs]class NetworkReimport:
"""
Network class created from saved results.
"""
def __init__(self, results_path, **kwargs):
# import configs
self.config = {}
with open('{}/configs.csv'.format(results_path), 'r') as f:
reader = csv.reader(f)
for row in reader:
a = iter(row[1:])
self.config[row[0]] = dict(zip(a, a))
parameters = kwargs.get('parameters', 'all')
# import pypsa network
if ('pypsa_network' in parameters or parameters == 'all') and \
os.path.isdir(os.path.join(results_path, 'pypsa_network')):
self.pypsa = PyPSANetwork()
self.pypsa.import_from_csv_folder(
os.path.join(results_path, 'pypsa_network'))
else:
self.pypsa = None
# create ResultsReimport class
self.results = ResultsReimport(
results_path, parameters=parameters)
[docs]class ResultsReimport:
"""
Results class created from saved results.
"""
def __init__(self, results_path, parameters='all'):
# measures
measures_df = pd.read_csv(os.path.join(results_path, 'measures.csv'),
index_col=0)
self.measures = list(measures_df.measure.values)
# if string is given convert to list
if isinstance(parameters, str):
if parameters == 'all':
parameters = ['powerflow_results', 'grid_expansion_results',
'curtailment_results',
'storage_integration_results']
else:
parameters = [parameters]
# import power flow results
if 'powerflow_results' in parameters and os.path.isdir(os.path.join(
results_path, 'powerflow_results')):
# line loading
self.i_res = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'currents.csv'),
index_col=0, parse_dates=True)
# voltage
self.pfa_v_mag_pu = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'voltages_pu.csv'),
index_col=0, parse_dates=True, header=[0, 1])
# active power
self.pfa_p = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'active_powers.csv'),
index_col=0, parse_dates=True)
# reactive power
self.pfa_q = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'reactive_powers.csv'),
index_col=0, parse_dates=True)
# apparent power
self.apparent_power = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'apparent_powers.csv'),
index_col=0, parse_dates=True)
# grid losses
self.grid_losses = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'grid_losses.csv'),
index_col=0, parse_dates=True)
# grid exchanges
self.hv_mv_exchanges = pd.read_csv(
os.path.join(
results_path, 'powerflow_results', 'hv_mv_exchanges.csv'),
index_col=0, parse_dates=True)
else:
self.i_res = None
self.pfa_v_mag_pu = None
self.pfa_p = None
self.pfa_q = None
self.apparent_power = None
self.grid_losses = None
self.hv_mv_exchanges = None
# import grid expansion results
if 'grid_expansion_results' in parameters and os.path.isdir(
os.path.join(results_path, 'grid_expansion_results')):
# grid expansion costs
self.grid_expansion_costs = pd.read_csv(
os.path.join(
results_path, 'grid_expansion_results',
'grid_expansion_costs.csv'),
index_col=0)
# equipment changes
self.equipment_changes = pd.read_csv(
os.path.join(
results_path, 'grid_expansion_results',
'equipment_changes.csv'),
index_col=0)
else:
self.grid_expansion_costs = None
self.equipment_changes = None
# import curtailment results
if 'curtailment_results' in parameters and os.path.isdir(
os.path.join(results_path, 'curtailment_results')):
self.curtailment = {}
for file in os.listdir(os.path.join(
results_path, 'curtailment_results')):
if file.endswith(".csv"):
try:
key = file[0:-4]
if '-' in key:
# make tuple if curtailment was given for generator
# type and weather cell id
tmp = key.split('-')
key = (tmp[0], float(tmp[1]))
self.curtailment[key] = pd.read_csv(
os.path.join(
results_path, 'curtailment_results', file),
index_col=0, parse_dates=True)
except Exception as e:
logging.warning(
'The following error occured when trying to '
'import curtailment results: {}'.format(e))
else:
self.curtailment = None
# import storage results
if 'storage_integration_results' in parameters and os.path.isdir(
os.path.join(results_path, 'storage_integration_results')):
# storages
self.storages = pd.read_csv(
os.path.join(results_path, 'storage_integration_results',
'storages.csv'),
index_col=0)
# storages costs reduction
try:
self.storages_costs_reduction = pd.read_csv(
os.path.join(
results_path, 'storage_integration_results',
'storages_costs_reduction.csv'),
index_col=0)
except:
pass
# storages time series
self.storages_p = pd.read_csv(
os.path.join(
results_path, 'storage_integration_results',
'storages_active_power.csv'),
index_col=0, parse_dates=True)
# storages time series
self.storages_q = pd.read_csv(
os.path.join(
results_path, 'storage_integration_results',
'storages_reactive_power.csv'),
index_col=0, parse_dates=True)
else:
self.storages = None
self.storages_costs_reduction = None
self.storages_p = None
self.storages_q = None
[docs] def v_res(self, nodes=None, level=None):
"""
Get resulting voltage level at node.
Parameters
----------
nodes : :obj:`list`
List of string representatives of grid topology components, e.g.
:class:`~.grid.components.Generator`. If not provided defaults to
all nodes available in grid level `level`.
level : :obj:`str`
Either 'mv' or 'lv' or None (default). Depending on which grid
level results you are interested in. It is required to provide this
argument in order to distinguish voltage levels at primary and
secondary side of the transformer/LV station.
If not provided (respectively None) defaults to ['mv', 'lv'].
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Resulting voltage levels obtained from power flow analysis
"""
# check if voltages are available:
if hasattr(self, 'pfa_v_mag_pu'):
self.pfa_v_mag_pu.sort_index(axis=1, inplace=True)
else:
message = "No voltage results available."
raise AttributeError(message)
if level is None:
level = ['mv', 'lv']
if nodes is None:
return self.pfa_v_mag_pu.loc[:, (level, slice(None))]
else:
not_included = [_ for _ in nodes
if _ not in list(self.pfa_v_mag_pu[level].columns)]
labels_included = [_ for _ in nodes if _ not in not_included]
if not_included:
logging.warning("Voltage levels for {nodes} are not returned "
"from PFA".format(nodes=not_included))
return self.pfa_v_mag_pu[level][labels_included]
[docs] def s_res(self, components=None):
"""
Get apparent power in kVA at line(s) and transformer(s).
Parameters
----------
components : :obj:`list`
List of string representatives of :class:`~.grid.components.Line`
or :class:`~.grid.components.Transformer`. If not provided defaults
to return apparent power of all lines and transformers in the grid.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Apparent power in kVA for lines and/or transformers.
"""
if components is None:
return self.apparent_power
else:
not_included = [_ for _ in components
if _ not in self.apparent_power.index]
labels_included = [_ for _ in components if _ not in not_included]
if not_included:
logging.warning(
"No apparent power results available for: {}".format(
not_included))
return self.apparent_power.loc[:, labels_included]
[docs] def storages_timeseries(self):
"""
Returns a dataframe with storage time series.
Returns
-------
:pandas:`pandas.DataFrame<dataframe>`
Dataframe containing time series of all storages installed in the
MV grid and LV grids. Index of the dataframe is a
:pandas:`pandas.DatetimeIndex<datetimeindex>`. Columns are the
storage representatives.
"""
return self.storages_p, self.storages_q