from __future__ import annotations
import logging
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
import saio
from sqlalchemy.engine.base import Engine
from edisgo.io.db import session_scope_egon_data
from edisgo.io.timeseries_import import _timeindex_helper_func
from edisgo.tools import tools
if TYPE_CHECKING:
from edisgo import EDisGo
logger = logging.getLogger(__name__)
[docs]def oedb(
edisgo_obj: EDisGo,
scenario: str,
engine: Engine,
timeindex=None,
):
"""
Gets industrial and CTS DSM profiles from the
`OpenEnergy DataBase <https://openenergy-platform.org/dataedit/schemas>`_.
Profiles comprise minimum and maximum load increase in MW as well as maximum energy
pre- and postponing in MWh.
Parameters
----------
edisgo_object : :class:`~.EDisGo`
scenario : str
Scenario for which to retrieve DSM data. Possible options
are 'eGon2035' and 'eGon100RE'.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
timeindex : :pandas:`pandas.DatetimeIndex<DatetimeIndex>` or None
Specifies time steps for which to return data. Leap years can currently
not be handled. In case the given timeindex contains a leap year, the data will
be indexed using the default year (2035 in case of the 'eGon2035' and to 2045
in case of the 'eGon100RE' scenario) and returned for the whole year.
If no timeindex is provided, the timeindex set in
:py:attr:`~.network.timeseries.TimeSeries.timeindex` is used.
If :py:attr:`~.network.timeseries.TimeSeries.timeindex` is not set, the data
is indexed using the default year and returned for the whole year.
Returns
--------
dict(str, :pandas:`pandas.DataFrame<DataFrame>`)
Dictionary with DSM data with keys `p_min`, `p_max`, `e_min` and `e_max` (see
:class:`~.network.dsm.DSM` for more information). Values contain dataframes with
DSM profiles per load for one year in an hourly resolution in MW. Index of the
dataframes are time indices. Columns contain the load name the DSM profile is
associated with as in index of :attr:`~.network.topology.Topology.loads_df`.
"""
# get CTS and industrial DSM profiles
dsm_cts = get_profile_cts(edisgo_obj, scenario, engine)
ind_loads = edisgo_obj.topology.loads_df[
(edisgo_obj.topology.loads_df.type == "conventional_load")
& (edisgo_obj.topology.loads_df.sector == "industry")
]
dsm_ind = get_profiles_per_industrial_load(
ind_loads.building_id.unique(), scenario, engine
)
# rename industrial DSM profiles, join with CTS profiles and set time index
rename_series = (
ind_loads.loc[:, ["building_id"]]
.dropna()
.reset_index()
.set_index("building_id")
.iloc[:, 0]
)
timeindex, timeindex_full = _timeindex_helper_func(
edisgo_obj,
timeindex,
default_year=tools.get_year_based_on_scenario(scenario),
allow_leap_year=False,
)
dsm_ind_cts = {}
for dsm_profile in ["e_min", "e_max", "p_min", "p_max"]:
dsm_ind[dsm_profile].rename(columns=rename_series, inplace=True)
dsm_ind_cts_tmp = pd.concat(
[dsm_cts[dsm_profile], dsm_ind[dsm_profile]], axis=1
)
dsm_ind_cts_tmp.index = timeindex_full
dsm_ind_cts[dsm_profile] = dsm_ind_cts_tmp.loc[timeindex, :]
return dsm_ind_cts
[docs]def get_profiles_per_industrial_load(
load_ids,
scenario: str,
engine: Engine,
):
"""
Gets industrial DSM profiles per site and OSM area.
Parameters
----------
load_ids : list(int)
List of industrial site and OSM IDs to retrieve DSM profiles for.
scenario : str
Scenario for which to retrieve DSM data. Possible options
are 'eGon2035' and 'eGon100RE'.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
--------
dict(str, :pandas:`pandas.DataFrame<DataFrame>`)
Dictionary with DSM data with keys `p_min`, `p_max`, `e_min` and `e_max`. Values
contain dataframes with DSM profiles per site and OSM area for one year in an
hourly resolution in MW. Index contains hour of the year (from 0 to 8759) and
column names are site ID as integer.
"""
saio.register_schema("demand", engine)
from saio.demand import (
egon_demandregio_sites_ind_electricity_dsm_timeseries as sites_ind_dsm_ts,
)
from saio.demand import (
egon_osm_ind_load_curves_individual_dsm_timeseries,
egon_sites_ind_load_curves_individual_dsm_timeseries,
)
dsm_dict = {}
with session_scope_egon_data(engine) as session:
query = session.query(
egon_sites_ind_load_curves_individual_dsm_timeseries.site_id,
egon_sites_ind_load_curves_individual_dsm_timeseries.p_min,
egon_sites_ind_load_curves_individual_dsm_timeseries.p_max,
egon_sites_ind_load_curves_individual_dsm_timeseries.e_min,
egon_sites_ind_load_curves_individual_dsm_timeseries.e_max,
).filter(
egon_sites_ind_load_curves_individual_dsm_timeseries.scn_name == scenario,
egon_sites_ind_load_curves_individual_dsm_timeseries.site_id.in_(load_ids),
)
df_sites_1 = pd.read_sql(sql=query.statement, con=engine)
with session_scope_egon_data(engine) as session:
query = session.query(
sites_ind_dsm_ts.industrial_sites_id.label("site_id"),
sites_ind_dsm_ts.p_min,
sites_ind_dsm_ts.p_max,
sites_ind_dsm_ts.e_min,
sites_ind_dsm_ts.e_max,
).filter(
sites_ind_dsm_ts.scn_name == scenario,
sites_ind_dsm_ts.industrial_sites_id.in_(load_ids),
)
df_sites_2 = pd.read_sql(sql=query.statement, con=engine)
with session_scope_egon_data(engine) as session:
query = session.query(
egon_osm_ind_load_curves_individual_dsm_timeseries.osm_id.label("site_id"),
egon_osm_ind_load_curves_individual_dsm_timeseries.p_min,
egon_osm_ind_load_curves_individual_dsm_timeseries.p_max,
egon_osm_ind_load_curves_individual_dsm_timeseries.e_min,
egon_osm_ind_load_curves_individual_dsm_timeseries.e_max,
).filter(
egon_osm_ind_load_curves_individual_dsm_timeseries.scn_name == scenario,
egon_osm_ind_load_curves_individual_dsm_timeseries.osm_id.in_(load_ids),
)
df_areas = pd.read_sql(sql=query.statement, con=engine)
df = pd.concat([df_sites_1, df_sites_2, df_areas])
# add time step column
df["time_step"] = len(df) * [np.arange(0, 8760)]
# un-nest time series data and pivot so that time_step becomes index and
# site_id column names
dsm_dict["p_min"] = _pivot_helper(df, "p_min")
dsm_dict["p_max"] = _pivot_helper(df, "p_max")
dsm_dict["e_min"] = _pivot_helper(df, "e_min")
dsm_dict["e_max"] = _pivot_helper(df, "e_max")
return dsm_dict
[docs]def get_profile_cts(
edisgo_obj: EDisGo,
scenario: str,
engine: Engine,
):
"""
Gets CTS DSM profiles for all CTS loads in the MV grid.
Parameters
----------
edisgo_object : :class:`~.EDisGo`
scenario : str
Scenario for which to retrieve DSM data. Possible options
are 'eGon2035' and 'eGon100RE'.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
--------
dict(str, :pandas:`pandas.DataFrame<DataFrame>`)
Dictionary with DSM data with keys `p_min`, `p_max`, `e_min` and `e_max`. Values
contain dataframes with DSM profiles per CTS load for one year in an
hourly resolution in MW. Index contains hour of the year (from 0 to 8759) and
column names are site ID as integer.
Notes
------
Be aware, that in this function the DSM time series are disaggregated to all CTS
loads in the grid. In some cases, this can lead to an over- or underestimation of
the DSM potential, as in egon_data buildings are mapped to a grid based on the
zensus cell they are in whereas in ding0 buildings are mapped to a grid based on
the geolocation. As it can happen that buildings lie outside an MV grid but within
a zensus cell that is assigned to that MV grid, they are mapped differently in
egon_data and ding0.
"""
saio.register_schema("demand", engine)
from saio.demand import egon_etrago_electricity_cts_dsm_timeseries
# get data
dsm_dict = {}
with session_scope_egon_data(engine) as session:
query = session.query(
egon_etrago_electricity_cts_dsm_timeseries.bus.label("site_id"),
egon_etrago_electricity_cts_dsm_timeseries.p_min,
egon_etrago_electricity_cts_dsm_timeseries.p_max,
egon_etrago_electricity_cts_dsm_timeseries.e_min,
egon_etrago_electricity_cts_dsm_timeseries.e_max,
).filter(
egon_etrago_electricity_cts_dsm_timeseries.scn_name == scenario,
egon_etrago_electricity_cts_dsm_timeseries.bus == edisgo_obj.topology.id,
)
df = pd.read_sql(sql=query.statement, con=engine)
# add time step column
df["time_step"] = len(df) * [np.arange(0, 8760)]
# un-nest time series data and pivot so that time_step becomes index and
# site_id column names
dsm_dict["p_min"] = _pivot_helper(df, "p_min")
dsm_dict["p_max"] = _pivot_helper(df, "p_max")
dsm_dict["e_min"] = _pivot_helper(df, "e_min")
dsm_dict["e_max"] = _pivot_helper(df, "e_max")
# distribute over all CTS loads
cts_loads = edisgo_obj.topology.loads_df[
(edisgo_obj.topology.loads_df.type == "conventional_load")
& (edisgo_obj.topology.loads_df.sector == "cts")
]
if not dsm_dict["p_min"].empty:
if len(cts_loads) == 0:
raise ValueError("There is CTS DSM potential but no CTS loads.")
for dsm_ts in ["p_min", "p_max", "e_min", "e_max"]:
dsm_dict[dsm_ts] = pd.DataFrame(
data=(
np.matmul(
dsm_dict[dsm_ts].values, np.matrix(cts_loads["p_set"].values)
)
/ cts_loads["p_set"].sum()
),
index=dsm_dict[dsm_ts].index,
columns=cts_loads["p_set"].index,
)
return dsm_dict
def _pivot_helper(df_db, col):
df = (
df_db.loc[:, ["site_id", col, "time_step"]]
.explode([col, "time_step"])
.astype({col: "float"})
)
df = df.pivot(index="time_step", columns="site_id", values=col)
return df