Source code for edisgo.network.heat

from __future__ import annotations

import logging
import os

from zipfile import ZipFile

import numpy as np
import pandas as pd

from edisgo.io import heat_pump_import, timeseries_import
from edisgo.tools import tools

logger = logging.getLogger(__name__)


[docs] class HeatPump: """ Data container for all heat pump data. This class holds data on heat pump COP, heat demand time series, and thermal storage data. """ def __init__(self, **kwargs): pass @property def cop_df(self): """ DataFrame with COP time series of heat pumps. Parameters ----------- df : :pandas:`pandas.DataFrame<DataFrame>` DataFrame with COP time series of heat pumps in p.u.. Index of the dataframe is a time index and should contain all time steps given in :attr:`~.network.timeseries.TimeSeries.timeindex`. Column names are names of heat pumps as in :attr:`~.network.topology.Topology.loads_df`. Returns ------- :pandas:`pandas.DataFrame<DataFrame>` DataFrame with COP time series of heat pumps in p.u.. For more information on the dataframe see input parameter `df`. """ try: return self._cop_df except Exception: return pd.DataFrame() @cop_df.setter def cop_df(self, df): self._cop_df = df @property def heat_demand_df(self): """ DataFrame with heat demand time series of heat pumps. Parameters ----------- df : :pandas:`pandas.DataFrame<DataFrame>` DataFrame with heat demand time series of heat pumps in MW. Index of the dataframe is a time index and should contain all time steps given in :attr:`~.network.timeseries.TimeSeries.timeindex`. Column names are names of heat pumps as in :attr:`~.network.topology.Topology.loads_df`. Returns ------- :pandas:`pandas.DataFrame<DataFrame>` DataFrame with heat demand time series of heat pumps in MW. For more information on the dataframe see input parameter `df`. """ try: return self._heat_demand_df except Exception: return pd.DataFrame() @heat_demand_df.setter def heat_demand_df(self, df): self._heat_demand_df = df @property def thermal_storage_units_df(self): """ DataFrame with heat pump's thermal storage information. Parameters ----------- df : :pandas:`pandas.DataFrame<DataFrame>` DataFrame with thermal storage information. Index of the dataframe are names of heat pumps as in :attr:`~.network.topology.Topology.loads_df`. Columns of the dataframe are: capacity : float Thermal storage capacity in MWh. efficiency : float Charging and discharging efficiency in p.u.. state_of_charge_initial : float Initial state of charge in p.u.. Returns ------- :pandas:`pandas.DataFrame<DataFrame>` DataFrame with thermal storage information. For more information on the dataframe see input parameter `df`. """ try: return self._thermal_storage_units_df except Exception: return pd.DataFrame( columns=["capacity", "efficiency", "state_of_charge_initial"] ) @thermal_storage_units_df.setter def thermal_storage_units_df(self, df): self._thermal_storage_units_df = df
[docs] def set_cop(self, edisgo_object, ts_cop, **kwargs): """ Write COP time series for heat pumps to py:attr:`~cop_df`. COP time series can either be given to this function or be obtained from the `OpenEnergy DataBase <https://openenergy-platform.org/dataedit/schemas>`_. In case they are obtained from the OpenEnergy DataBase the heat pumps need to already be integrated into the grid, i.e. given in :attr:`~.network.topology.Topology.loads_df`. In case COP time series are set for heat pumps that were already assigned a COP time series, their existing COP time series is overwritten by this function. Parameters ---------- edisgo_object : :class:`~.EDisGo` ts_cop : str or :pandas:`pandas.DataFrame<DataFrame>` Defines option used to set COP time series. Possible options are: * 'oedb' COP / efficiency data are obtained from the `OpenEnergy DataBase <https://openenergy-platform.org/dataedit/schemas>`_. In case of heat pumps weather cell specific hourly COP time series are obtained (see :func:`edisgo.io.timeseries_import.cop_oedb` for more information). Using information on which weather cell each heat pump is in, the weather cell specific time series are mapped to each heat pump. In case of resistive heaters a constant efficiency is set (see :func:`edisgo.io.heat_pump_import.efficiency_resistive_heaters_oedb`). Weather cell information of heat pumps is obtained from column 'weather_cell_id' in :attr:`~.network.topology.Topology.loads_df`. In case no heat pump has weather cell information, this function will throw an error. In case only some heat pumps are missing weather cell information, a random existing weather cell is used to fill missing information. This option requires that the parameter `engine` is provided as keyword argument. For further settings, the parameters `timeindex` and `heat_pump_names` can also be provided as keyword arguments. * :pandas:`pandas.DataFrame<DataFrame>` DataFrame with self-provided COP time series per heat pump. See :py:attr:`~cop_df` on information on the required dataframe format. Other Parameters ------------------ engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>` Database engine. This parameter is required in case `ts_cop` is 'oedb'. heat_pump_names : list(str) or None Defines for which heat pumps to set COP time series in case `ts_cop` is 'oedb'. If None, all heat pumps in :attr:`~.network.topology.Topology.loads_df` (type is 'heat_pump') are used. Default: None. timeindex : :pandas:`pandas.DatetimeIndex<DatetimeIndex>` or None Specifies time steps for which to set data in case `ts_cop` is 'oedb'. Leap years can currently not be handled. In case the given timeindex contains a leap year, the data will be indexed using the default year 2011 and returned for the whole year. If no timeindex is provided, the timeindex set in :py:attr:`~.network.timeseries.TimeSeries.timeindex` is used. If :py:attr:`~.network.timeseries.TimeSeries.timeindex` is not set, the data is indexed using the default year 2011 and returned for the whole year. """ if isinstance(ts_cop, str) and ts_cop == "oedb": heat_pump_names = kwargs.get("heat_pump_names", None) # get heat_pump_names in case they are not specified if heat_pump_names is None: heat_pump_names = edisgo_object.topology.loads_df[ edisgo_object.topology.loads_df.type == "heat_pump" ].index pth_df = edisgo_object.topology.loads_df.loc[heat_pump_names, :] hp_df = pth_df[ ~pth_df.sector.isin( [ "individual_heating_resistive_heater", "district_heating_resistive_heater", ] ) ] # set COP of heat pumps if len(hp_df) > 0: # check weather cell information of heat pumps # if no heat pump has weather cell information, throw an error if ( "weather_cell_id" not in hp_df.columns or hp_df.weather_cell_id.isna().all() ): raise ValueError( "In order to obtain COP time series data from database " "information on weather cells (expected in column " "'weather_cell_id' in Topology.loads_df) is needed, but none " "is given." ) # in case only some heat pumps have missing weather cell information, # give a warning and use random weather cell ID to fill missing # information if hp_df.weather_cell_id.isna().any(): logger.warning( "There are heat pumps with no weather cell ID. They are " "assigned a weather cell ID from another heat pump." ) random_weather_cell_id = hp_df.weather_cell_id.dropna().unique()[0] hp_without_weather_cell = hp_df[hp_df.weather_cell_id.isna()].index # random weather cell ID is not written to loads_df! hp_df.loc[ hp_without_weather_cell, "weather_cell_id" ] = random_weather_cell_id weather_cells = hp_df.weather_cell_id.dropna().unique() # get COP per weather cell ts_cop_per_weather_cell = timeseries_import.cop_oedb( edisgo_object=edisgo_object, engine=kwargs.get("engine", None), weather_cell_ids=weather_cells, timeindex=kwargs.get("timeindex", None), ) # assign COP time series to each heat pump cop_df = pd.DataFrame( data={ _: ts_cop_per_weather_cell.loc[ :, hp_df.at[_, "weather_cell_id"] ] for _ in hp_df.index } ) else: cop_df = pd.DataFrame() # set efficiency of resistive heaters rh_df = pth_df[ pth_df.sector.isin( [ "individual_heating_resistive_heater", "district_heating_resistive_heater", ] ) ] if len(rh_df) > 0: # get efficiencies of resistive heaters eta_dict = heat_pump_import.efficiency_resistive_heaters_oedb( scenario="eGon2035", # currently only possible scenario engine=kwargs.get("engine", None), ) # determine timeindex to use if not cop_df.empty: timeindex = cop_df.index else: timeindex, _ = timeseries_import._timeindex_helper_func( edisgo_object, kwargs.get("timeindex", None), default_year=2011, allow_leap_year=False, ) # assign efficiency time series to each heat pump eta_df = pd.DataFrame( data={ _: ( eta_dict["central_resistive_heater"] if rh_df.at[_, "sector"] == "district_heating_resistive_heater" else eta_dict["rural_resistive_heater"] ) for _ in rh_df.index }, index=timeindex, ) else: eta_df = pd.DataFrame() cop_df = pd.concat([cop_df, eta_df], axis=1) elif isinstance(ts_cop, pd.DataFrame): cop_df = ts_cop else: raise ValueError("'ts_cop' must either be a pandas DataFrame or 'oedb'.") # concat new COP time series with existing ones and drop any duplicate entries self.cop_df = tools.drop_duplicated_columns( pd.concat([self.cop_df, cop_df], axis=1) )
[docs] def set_heat_demand(self, edisgo_object, ts_heat_demand, **kwargs): """ Write heat demand time series of heat pumps to py:attr:`~heat_demand_df`. Heat demand time series can either be given to this function or be obtained from the `OpenEnergy DataBase <https://openenergy-platform.org/dataedit/schemas>`_. In case they are obtained from the OpenEnergy DataBase the heat pumps need to already be integrated into the grid, i.e. given in :attr:`~.network.topology.Topology.loads_df`. In case heat demand time series are set for heat pumps that were already assigned a heat demand time series, their existing heat demand time series is overwritten by this function. Parameters ---------- edisgo_object : :class:`~.EDisGo` ts_heat_demand : str or :pandas:`pandas.DataFrame<DataFrame>` Defines option used to set heat demand time series. Possible options are: * 'oedb' Heat demand time series are obtained from the `OpenEnergy DataBase <https://openenergy-platform.org/dataedit/schemas>`_ (see :func:`edisgo.io.timeseries_import.heat_demand_oedb` for more information). Time series are only obtained for heat pumps that are already integrated into the grid. This option requires that the parameters `engine` and `scenario` are provided as keyword arguments. For further settings, the parameters `timeindex` and `heat_pump_names` can also be provided as keyword arguments. * :pandas:`pandas.DataFrame<DataFrame>` DataFrame with self-provided heat demand time series per heat pump. See :py:attr:`~heat_demand_df` for information on the required dataframe format. Other Parameters ------------------ scenario : str Scenario for which to retrieve heat demand data. This parameter is required in case `ts_heat_demand` is 'oedb'. Possible options are 'eGon2035' and 'eGon100RE'. engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>` Database engine. This parameter is required in case `ts_heat_demand` is 'oedb'. heat_pump_names : list(str) or None Defines for which heat pumps to get heat demand time series for in case `ts_heat_demand` is 'oedb'. If None, all heat pumps in :attr:`~.network.topology.Topology.loads_df` (type is 'heat_pump') are used. Default: None. timeindex : :pandas:`pandas.DatetimeIndex<DatetimeIndex>` or None Specifies time steps for which to set data in case `ts_heat_demand` is 'oedb'. Leap years can currently not be handled. In case the given timeindex contains a leap year, the data will be indexed using the default year (2035 in case of the 'eGon2035' and to 2045 in case of the 'eGon100RE' scenario) and returned for the whole year. If no timeindex is provided, the timeindex set in :py:attr:`~.network.timeseries.TimeSeries.timeindex` is used. If :py:attr:`~.network.timeseries.TimeSeries.timeindex` is not set, the data is indexed using the default year and returned for the whole year. """ # in case time series from oedb are used, retrieve oedb time series if isinstance(ts_heat_demand, str) and ts_heat_demand == "oedb": heat_pump_names = kwargs.get("heat_pump_names", None) # get heat_pump_names in case they are not specified if heat_pump_names is None: heat_pump_names = edisgo_object.topology.loads_df[ edisgo_object.topology.loads_df.type == "heat_pump" ].index if len(heat_pump_names) > 0: # get heat demand per heat pump heat_demand_df = timeseries_import.heat_demand_oedb( edisgo_object, scenario=kwargs.get("scenario", ""), engine=kwargs.get("engine", None), timeindex=kwargs.get("timeindex", None), ) heat_pump_names_select = [ _ for _ in heat_demand_df.columns if _ in heat_pump_names ] heat_demand_df = heat_demand_df.loc[:, heat_pump_names_select] else: heat_demand_df = pd.DataFrame() elif isinstance(ts_heat_demand, pd.DataFrame): heat_demand_df = ts_heat_demand else: raise ValueError( "'ts_heat_demand' must either be a pandas DataFrame or 'oedb'." ) # concat new COP time series with existing ones and drop any duplicate entries self.heat_demand_df = tools.drop_duplicated_columns( pd.concat([self.heat_demand_df, heat_demand_df], axis=1) )
[docs] def reduce_memory(self, attr_to_reduce=None, to_type="float32"): """ Reduces size of dataframes to save memory. See :attr:`~.edisgo.EDisGo.reduce_memory` for more information. Parameters ----------- attr_to_reduce : list(str), optional List of attributes to reduce size for. Per default, the following attributes are reduced if they exist: cop_df, heat_demand_df. to_type : str, optional Data type to convert time series data to. This is a tradeoff between precision and memory. Default: "float32". """ if attr_to_reduce is None: attr_to_reduce = self._timeseries_attributes for attr in attr_to_reduce: setattr( self, attr, getattr(self, attr).apply(lambda _: _.astype(to_type)), )
def _get_matching_dict_of_attributes_and_file_names(self): """ Helper function that matches attribute names to file names. Is used in functions :py:attr:`~to_csv` and :py:attr:`~from_csv` to set which attribute of :class:`~.network.heat.HeatPump` is saved under which file name. Returns ------- dict Dictionary matching attribute names and file names with attribute names as keys and corresponding file names as values. """ return { "cop_df": "cop.csv", "heat_demand_df": "heat_demand.csv", "thermal_storage_units_df": "thermal_storage_units.csv", } @property def _timeseries_attributes(self): return ["heat_demand_df", "cop_df"]
[docs] def to_csv(self, directory, reduce_memory=False, **kwargs): """ Exports heat pump data to csv files. The following attributes are exported: * 'cop_df' Attribute :py:attr:`~cop_df` is saved to `cop.csv`. * 'heat_demand_df' Attribute :py:attr:`~heat_demand_df` is saved to `heat_demand.csv`. * 'thermal_storage_units_df' Attribute :py:attr:`~thermal_storage_units_df` is saved to `thermal_storage_units.csv`. Parameters ---------- directory : str Path to save data to. reduce_memory : bool, optional If True, size of dataframes is reduced using :attr:`~.network.heat.HeatPump.reduce_memory`. Optional parameters of :attr:`~.network.heat.HeatPump.reduce_memory` can be passed as kwargs to this function. Default: False. Other Parameters ------------------ kwargs : Kwargs may contain arguments of :attr:`~.network.heat.HeatPump.reduce_memory`. """ if reduce_memory is True: self.reduce_memory(**kwargs) os.makedirs(directory, exist_ok=True) attrs = self._get_matching_dict_of_attributes_and_file_names() for attr, file in attrs.items(): df = getattr(self, attr) if not df.empty: path = os.path.join(directory, file) df.to_csv(path)
[docs] def from_csv(self, data_path, from_zip_archive=False): """ Restores heat pump data from csv files. Parameters ---------- data_path : str Path to heat pump csv files. from_zip_archive : bool, optional Set True if data is archived in a zip archive. Default: False """ attrs = self._get_matching_dict_of_attributes_and_file_names() if from_zip_archive: # read from zip archive # setup ZipFile Class zip = ZipFile(data_path) # get all directories and files within zip archive files = zip.namelist() # add directory and .csv to files to match zip archive attrs = {k: f"heat_pump/{v}" for k, v in attrs.items()} else: # read from directory # check files within the directory files = os.listdir(data_path) attrs_to_read = {k: v for k, v in attrs.items() if v in files} for attr, file in attrs_to_read.items(): if from_zip_archive: # open zip file to make it readable for pandas with zip.open(file) as f: df = pd.read_csv(f, index_col=0, parse_dates=True) else: path = os.path.join(data_path, file) df = pd.read_csv(path, index_col=0, parse_dates=True) setattr(self, attr, df) if from_zip_archive: # make sure to destroy ZipFile Class to close any open connections zip.close()
[docs] def resample_timeseries( self, method: str = "ffill", freq: str | pd.Timedelta = "15min" ): """ Resamples COP and heat demand time series to a desired resolution. Both up- and down-sampling methods are possible. Parameters ---------- method : str, optional See :attr:`~.EDisGo.resample_timeseries` for more information. freq : str, optional See :attr:`~.EDisGo.resample_timeseries` for more information. """ for attr in self._timeseries_attributes: attr_index = getattr(self, attr).index if len(attr_index) < 2: logger.debug( f"{attr} cannot be resampled as it contains less than two " f"time steps." ) else: freq_orig = attr_index[1] - attr_index[0] tools.resample(self, freq_orig, method, freq, attr_to_resample=[attr])
[docs] def check_integrity(self): """ Check data integrity. Checks for duplicated and missing labels as well as implausible values. """ check_dfs = ["heat_demand_df", "cop_df"] # check for duplicate columns for ts in check_dfs: df = getattr(self, ts) duplicated_labels = df.columns[df.columns.duplicated()].values if len(duplicated_labels) > 0: logger.warning( f"HeatPump timeseries {ts} contains the following duplicates: " f"{set(duplicated_labels)}." ) # check that all profiles exist for the same heat pumps columns = set(np.concatenate([getattr(self, _).columns for _ in check_dfs])) for ts in check_dfs: df = getattr(self, ts) missing_entries = [_ for _ in columns if _ not in df.columns] if len(missing_entries) > 0: logger.warning( f"HeatPump timeseries {ts} is missing the following " f"entries: {missing_entries}." )