Source code for edisgo.io.generators_import

from __future__ import annotations

import logging
import os
import random

from typing import TYPE_CHECKING

import numpy as np
import pandas as pd
import saio

from sqlalchemy import func
from sqlalchemy.engine.base import Engine

from edisgo.io.db import get_srid_of_db_table, session_scope_egon_data
from edisgo.tools import session_scope
from edisgo.tools.geo import find_nearest_bus, proj2equidistant
from edisgo.tools.tools import (
    determine_bus_voltage_level,
    determine_grid_integration_voltage_level,
)

if "READTHEDOCS" not in os.environ:
    import geopandas as gpd

    from egoio.db_tables import model_draft, supply
    from shapely.ops import transform
    from shapely.wkt import loads as wkt_loads

if TYPE_CHECKING:
    from edisgo import EDisGo

logger = logging.getLogger(__name__)


[docs] def oedb_legacy(edisgo_object, generator_scenario, **kwargs): """ Gets generator park for specified scenario from oedb and integrates generators into the grid. The importer uses SQLAlchemy ORM objects. These are defined in `ego.io <https://github.com/openego/ego.io/tree/dev/egoio/db_tables/>`_. The data is imported from the tables `conventional power plants <https://openenergy-platform.org/dataedit/\ view/supply/ego_dp_conv_powerplant>`_ and `renewable power plants <https://openenergy-platform.org/dataedit/\ view/supply/ego_dp_res_powerplant>`_. When the generator data is retrieved, the following steps are conducted: * Step 1: Update capacity of existing generators if `update_existing` is True, which it is by default. * Step 2: Remove decommissioned generators if `remove_decommissioned` is True, which it is by default. * Step 3: Integrate new MV generators. * Step 4: Integrate new LV generators. For more information on how generators are integrated, see :attr:`~.network.topology.Topology.connect_to_mv` and :attr:`~.network.topology.Topology.connect_to_lv`. Parameters ---------- edisgo_object : :class:`~.EDisGo` generator_scenario : str Scenario for which to retrieve generator data. Possible options are 'nep2035' and 'ego100'. Other Parameters ---------------- remove_decommissioned : bool If True, removes generators from network that are not included in the imported dataset (=decommissioned). Default: True. update_existing : bool If True, updates capacity of already existing generators to capacity specified in the imported dataset. Default: True. p_target : dict or None Per default, no target capacity is specified and generators are expanded as specified in the respective scenario. However, you may want to use one of the scenarios but have slightly more or less generation capacity than given in the respective scenario. In that case you can specify the desired target capacity per technology type using this input parameter. The target capacity dictionary must have technology types (e.g. 'wind' or 'solar') as keys and corresponding target capacities in MW as values. If a target capacity is given that is smaller than the total capacity of all generators of that type in the future scenario, only some generators in the future scenario generator park are installed, until the target capacity is reached. If the given target capacity is greater than that of all generators of that type in the future scenario, then each generator capacity is scaled up to reach the target capacity. Be careful to not have much greater target capacities as this will lead to unplausible generation capacities being connected to the different voltage levels. Also be aware that only technologies specified in the dictionary are expanded. Other technologies are kept the same. Default: None. allowed_number_of_comp_per_lv_bus : int Specifies, how many generators are at most allowed to be placed at the same LV bus. Default: 2. """ def _import_conv_generators(session): """ Import data for conventional generators from oedb. Returns ------- :pandas:`pandas.DataFrame<DataFrame>` Dataframe containing data on all conventional MV generators. You can find a full list of columns in :func:`edisgo.io.import_data.update_grids`. """ # build query generators_sqla = ( session.query( orm_conv_generators.columns.id, orm_conv_generators.columns.id.label("generator_id"), orm_conv_generators.columns.subst_id, orm_conv_generators.columns.la_id, orm_conv_generators.columns.capacity.label("p_nom"), orm_conv_generators.columns.voltage_level, orm_conv_generators.columns.fuel.label("generator_type"), func.ST_AsText( func.ST_Transform(orm_conv_generators.columns.geom, srid) ).label("geom"), ) .filter( orm_conv_generators.columns.subst_id == edisgo_object.topology.mv_grid.id ) .filter(orm_conv_generators.columns.voltage_level.in_([4, 5])) .filter(orm_conv_generators_version) ) return pd.read_sql_query( generators_sqla.statement, session.bind, index_col="id" ) def _import_res_generators(session): """ Import data for renewable generators from oedb. Returns ------- (:pandas:`pandas.DataFrame<DataFrame>`, :pandas:`pandas.DataFrame<DataFrame>`) Dataframe containing data on all renewable MV and LV generators. You can find a full list of columns in :func:`edisgo.io.import_data.update_grids`. Notes ----- If subtype is not specified it is set to 'unknown'. """ # build basic query generators_sqla = ( session.query( orm_re_generators.columns.id, orm_re_generators.columns.id.label("generator_id"), orm_re_generators.columns.subst_id, orm_re_generators.columns.la_id, orm_re_generators.columns.mvlv_subst_id, orm_re_generators.columns.electrical_capacity.label("p_nom"), orm_re_generators.columns.generation_type.label("generator_type"), orm_re_generators.columns.generation_subtype.label("subtype"), orm_re_generators.columns.voltage_level, orm_re_generators.columns.w_id.label("weather_cell_id"), func.ST_AsText( func.ST_Transform(orm_re_generators.columns.rea_geom_new, srid) ).label("geom"), func.ST_AsText( func.ST_Transform(orm_re_generators.columns.geom, srid) ).label("geom_em"), ) .filter( orm_re_generators.columns.subst_id == edisgo_object.topology.mv_grid.id ) .filter(orm_re_generators_version) ) # extend basic query for MV generators and read data from db generators_mv_sqla = generators_sqla.filter( orm_re_generators.columns.voltage_level.in_([4, 5]) ) gens_mv = pd.read_sql_query( generators_mv_sqla.statement, session.bind, index_col="id" ) # define generators with unknown subtype as 'unknown' gens_mv.loc[gens_mv["subtype"].isnull(), "subtype"] = "unknown" # convert capacity from kW to MW gens_mv.p_nom = pd.to_numeric(gens_mv.p_nom) / 1e3 # extend basic query for LV generators and read data from db generators_lv_sqla = generators_sqla.filter( orm_re_generators.columns.voltage_level.in_([6, 7]) ) gens_lv = pd.read_sql_query( generators_lv_sqla.statement, session.bind, index_col="id" ) # define generators with unknown subtype as 'unknown' gens_lv.loc[gens_lv["subtype"].isnull(), "subtype"] = "unknown" # convert capacity from kW to MW gens_lv.p_nom = pd.to_numeric(gens_lv.p_nom) / 1e3 return gens_mv, gens_lv def _validate_generation(): """ Validate generation capacity in updated grids. The validation uses the cumulative capacity of all generators. """ # set capacity difference threshold cap_diff_threshold = 10**-1 capacity_imported = ( generators_res_mv["p_nom"].sum() + generators_res_lv["p_nom"].sum() + generators_conv_mv["p_nom"].sum() ) capacity_grid = edisgo_object.topology.generators_df.p_nom.sum() logger.debug( f"Cumulative generator capacity (updated): {round(capacity_imported, 1)} MW" ) if abs(capacity_imported - capacity_grid) > cap_diff_threshold: raise ValueError( f"Cumulative capacity of imported generators (" f"{round(capacity_imported, 1)} MW) differs from cumulative capacity of" f" generators in updated grid ({round(capacity_grid, 1)} MW) by " f"{round(capacity_imported - capacity_grid, 1)} MW." ) else: logger.debug("Cumulative capacity of imported generators validated.") def _validate_sample_geno_location(): """ Checks that newly imported generators are located inside grid district. The check is performed for two randomly sampled generators. """ if ( all(generators_res_lv["geom"].notnull()) and all(generators_res_mv["geom"].notnull()) and not generators_res_lv["geom"].empty and not generators_res_mv["geom"].empty ): projection = proj2equidistant(srid) # get geom of 1 random MV and 1 random LV generator and transform sample_mv_geno_geom_shp = transform( projection, wkt_loads( generators_res_mv["geom"] .dropna() .sample(n=1, random_state=42) .values[0] ), ) sample_lv_geno_geom_shp = transform( projection, wkt_loads( generators_res_lv["geom"] .dropna() .sample(n=1, random_state=42) .values[0] ), ) # get geom of MV grid district mvgd_geom_shp = transform( projection, edisgo_object.topology.grid_district["geom"], ) # check if MVGD contains geno if not ( mvgd_geom_shp.contains(sample_mv_geno_geom_shp) and mvgd_geom_shp.contains(sample_lv_geno_geom_shp) ): raise ValueError( "At least one imported generator is not located in the MV " "grid area. Check compatibility of grid and generator " "datasets." ) oedb_data_source = edisgo_object.config["data_source"]["oedb_data_source"] srid = edisgo_object.topology.grid_district["srid"] # load ORM names orm_conv_generators_name = ( edisgo_object.config[oedb_data_source]["conv_generators_prefix"] + generator_scenario + edisgo_object.config[oedb_data_source]["conv_generators_suffix"] ) orm_re_generators_name = ( edisgo_object.config[oedb_data_source]["re_generators_prefix"] + generator_scenario + edisgo_object.config[oedb_data_source]["re_generators_suffix"] ) if oedb_data_source == "model_draft": # import ORMs orm_conv_generators = model_draft.__getattribute__(orm_conv_generators_name) orm_re_generators = model_draft.__getattribute__(orm_re_generators_name) # set dummy version condition (select all generators) orm_conv_generators_version = 1 == 1 orm_re_generators_version = 1 == 1 elif oedb_data_source == "versioned": data_version = edisgo_object.config["versioned"]["version"] # import ORMs orm_conv_generators = supply.__getattribute__(orm_conv_generators_name) orm_re_generators = supply.__getattribute__(orm_re_generators_name) # set version condition orm_conv_generators_version = ( orm_conv_generators.columns.version == data_version ) orm_re_generators_version = orm_re_generators.columns.version == data_version # get conventional and renewable generators with session_scope() as session: generators_conv_mv = _import_conv_generators(session) generators_res_mv, generators_res_lv = _import_res_generators(session) generators_mv = pd.concat( [ generators_conv_mv, generators_res_mv, ] ) # validate that imported generators are located inside the grid district _validate_sample_geno_location() _update_grids( edisgo_object=edisgo_object, imported_generators_mv=generators_mv, imported_generators_lv=generators_res_lv, **kwargs, ) if kwargs.get("p_target", None) is None: _validate_generation()
def _update_grids( edisgo_object, imported_generators_mv, imported_generators_lv, remove_decommissioned=True, update_existing=True, p_target=None, allowed_number_of_comp_per_lv_bus=2, **kwargs, ): """ Update network according to new generator dataset. Steps are: * Step 1: Update capacity of existing generators if `update_existing` is True, which it is by default. * Step 2: Remove decommissioned generators if `remove_decommissioned` is True, which it is by default. * Step 3: Integrate new MV generators. * Step 4: Integrate new LV generators. Parameters ---------- edisgo_object : :class:`~.EDisGo` imported_generators_mv : :pandas:`pandas.DataFrame<DataFrame>` Dataframe containing all MV generators. Index of the dataframe are the generator IDs. Columns are: * p_nom : float Nominal capacity in MW. * generator_type : str Type of generator (e.g. 'solar'). * subtype : str Subtype of generator (e.g. 'solar_roof_mounted'). * voltage_level : int Voltage level to connect to. Can be 4, 5, 6 or 7. * weather_cell_id : int Weather cell the generator is in. Only given for fluctuating generators. * geom : :shapely:`Shapely Point object<points>` Geolocation of generator. For CRS see config_grid.srid. * geom_em: :shapely:`Shapely Point object<points>` Geolocation of generator as given in energy map. For CRS see config_grid.srid. imported_generators_lv : :pandas:`pandas.DataFrame<DataFrame>` Dataframe containing all LV generators. Index of the dataframe are the generator IDs. Columns are the same as in `imported_generators_mv` plus: * mvlv_subst_id : int or float ID of MV-LV substation in grid = grid, the generator will be connected to. remove_decommissioned : bool See :func:`edisgo.io.generators_import.oedb` for more information. update_existing : bool See :func:`edisgo.io.generators_import.oedb` for more information. p_target : dict See :func:`edisgo.io.generators_import.oedb` for more information. allowed_number_of_comp_per_lv_bus : int See :func:`edisgo.io.generators_import.oedb` for more information. """ def _check_mv_generator_geom(generator_data): """ Checks if a valid geom is available in dataset. If yes, this geom will be used. If not, geom from EnergyMap is used if available. Parameters ---------- generator_data : :pandas:`pandas.Series<Series>` Series with among others 'geom' (geometry from open_eGo data processing) and 'geom_em' (geometry from EnergyMap). Returns ------- :shapely:`Shapely Point object<points>` or None Geometry of generator. None, if no geometry is available. """ # check if geom is available if generator_data.geom: return generator_data.geom else: # set geom to EnergyMap's geom, if available if generator_data.geom_em: logger.debug( "Generator {} has no geom entry, EnergyMap's geom " "entry will be used.".format(generator_data.name) ) return generator_data.geom_em return None # set capacity difference threshold cap_diff_threshold = 10**-4 # get all imported generators imported_gens = pd.concat( [imported_generators_lv, imported_generators_mv], sort=True ) logger.debug(f"{len(imported_gens)} generators imported.") # get existing generators and append ID column existing_gens = edisgo_object.topology.generators_df existing_gens["id"] = list( map(lambda _: int(_.split("_")[-1]), existing_gens.index) ) logger.debug( "Cumulative generator capacity (existing): " f"{round(existing_gens.p_nom.sum(), 1)} MW" ) # check if capacity of any of the imported generators is <= 0 # (this may happen if dp is buggy) and remove generator if it is gens_to_remove = imported_gens[imported_gens.p_nom <= 0] for id in gens_to_remove.index: # remove from topology (if generator exists) if id in existing_gens.id.values: gen_name = existing_gens[existing_gens.id == id].index[0] edisgo_object.topology.remove_generator(gen_name) logger.warning( "Capacity of generator {} is <= 0, it is therefore " "removed. Check your data source.".format(gen_name) ) # remove from imported generators imported_gens.drop(id, inplace=True) if id in imported_generators_mv.index: imported_generators_mv.drop(id, inplace=True) else: imported_generators_lv.drop(id, inplace=True) # ============================================= # Step 1: Update existing MV and LV generators # ============================================= if update_existing: # filter for generators that need to be updated (i.e. that # appear in the imported and existing generators dataframes) gens_to_update = existing_gens[ existing_gens.id.isin(imported_gens.index.values) ] # calculate capacity difference between existing and imported # generators gens_to_update["cap_diff"] = ( imported_gens.loc[gens_to_update.id, "p_nom"].values - gens_to_update.p_nom ) # in case there are generators whose capacity does not match, update # their capacity gens_to_update_cap = gens_to_update[ abs(gens_to_update.cap_diff) > cap_diff_threshold ] for id, row in gens_to_update_cap.iterrows(): edisgo_object.topology._generators_df.loc[id, "p_nom"] = imported_gens.loc[ row["id"], "p_nom" ] log_geno_count = len(gens_to_update_cap) log_geno_cap = gens_to_update_cap["cap_diff"].sum() logger.debug( "Capacities of {} of {} existing generators updated " "({} MW).".format( log_geno_count, len(gens_to_update), round(log_geno_cap, 1) ) ) # ================================================== # Step 2: Remove decommissioned MV and LV generators # ================================================== # filter for generators that do not appear in the imported but in # the existing generators dataframe decommissioned_gens = existing_gens[ ~existing_gens.id.isin(imported_gens.index.values) ] if not decommissioned_gens.empty and remove_decommissioned: for gen in decommissioned_gens.index: edisgo_object.topology.remove_generator(gen) log_geno_cap = decommissioned_gens.p_nom.sum() log_geno_count = len(decommissioned_gens) logger.debug( "{} decommissioned generators removed ({} MW).".format( log_geno_count, round(log_geno_cap, 1) ) ) # =================================== # Step 3: Integrate new MV generators # =================================== new_gens_mv = imported_generators_mv[ ~imported_generators_mv.index.isin(list(existing_gens.id)) ] new_gens_mv = new_gens_mv.assign( p=new_gens_mv.p_nom, ) new_gens_lv = imported_generators_lv[ ~imported_generators_lv.index.isin(list(existing_gens.id)) ] new_gens_lv = new_gens_lv.assign( p=new_gens_lv.p_nom, ) if p_target is not None: def update_imported_gens(layer, imported_gens): def drop_generators(generator_list, gen_type, total_capacity): random.seed(42) while ( generator_list[ generator_list["generator_type"] == gen_type ].p_nom.sum() > total_capacity and len( generator_list[generator_list["generator_type"] == gen_type] ) > 0 ): generator_list.drop( random.choice( generator_list[ generator_list["generator_type"] == gen_type ].index ), inplace=True, ) for gen_type in p_target.keys(): # Currently installed capacity existing_capacity = existing_gens[ existing_gens.index.isin(layer) & (existing_gens["type"] == gen_type).values ].p_nom.sum() # installed capacity in scenario expanded_capacity = ( existing_capacity + imported_gens[ imported_gens["generator_type"] == gen_type ].p_nom.sum() ) # target capacity target_capacity = p_target[gen_type] # required expansion required_expansion = target_capacity - existing_capacity # No generators to be expanded if ( imported_gens[ imported_gens["generator_type"] == gen_type ].p_nom.sum() == 0 ): continue # Reduction in capacity over status quo, so skip all expansion if required_expansion <= 0: imported_gens.drop( imported_gens[ imported_gens["generator_type"] == gen_type ].index, inplace=True, ) continue # More expansion than in NEP2035 required, keep all generators # and scale them up later if p_target[gen_type] >= expanded_capacity: continue # Reduced expansion, remove some generators from expansion drop_generators(imported_gens, gen_type, required_expansion) new_gens = pd.concat([new_gens_lv, new_gens_mv], sort=True) update_imported_gens(edisgo_object.topology.generators_df.index, new_gens) # drop types not in p_target from new_gens for gen_type in new_gens.generator_type.unique(): if gen_type not in p_target.keys(): new_gens.drop( new_gens[new_gens["generator_type"] == gen_type].index, inplace=True, ) new_gens_lv = new_gens[new_gens.voltage_level.isin([6, 7])] new_gens_mv = new_gens[new_gens.voltage_level.isin([4, 5])] # iterate over new generators and create them number_new_gens = len(new_gens_mv) for id in new_gens_mv.index.sort_values(ascending=True): # check if geom is available, skip otherwise geom = _check_mv_generator_geom(new_gens_mv.loc[id, :]) if geom is None: logger.warning( "Generator {} has no geom entry and will not be imported!".format(id) ) new_gens_mv.drop(id) continue new_gens_mv.at[id, "geom"] = geom edisgo_object.topology.connect_to_mv( edisgo_object, dict(new_gens_mv.loc[id, :]) ) log_geno_count = len(new_gens_mv) log_geno_cap = new_gens_mv["p_nom"].sum() logger.debug( "{} of {} new MV generators added ({} MW).".format( log_geno_count, number_new_gens, round(log_geno_cap, 1) ) ) # ==================================== # Step 4: Integrate new LV generators # ==================================== # check if new generators can be allocated to an existing LV grid if not imported_generators_lv.empty: grid_ids = edisgo_object.topology._lv_grid_ids if not any( [ int(_) in grid_ids for _ in list(imported_generators_lv["mvlv_subst_id"]) if not np.isnan(_) ] ): logger.warning( "None of the imported LV generators can be allocated " "to an existing LV grid. Check compatibility of grid " "and generator datasets." ) substations = edisgo_object.topology.buses_df.loc[ edisgo_object.topology.transformers_df.bus1.unique() ] new_gens_lv.geom = new_gens_lv.geom.apply(wkt_loads) new_gens_lv = gpd.GeoDataFrame( new_gens_lv, geometry="geom", crs=f"EPSG:{edisgo_object.topology.grid_district['srid']}", ) # iterate over new generators and create them for id in new_gens_lv.index.sort_values(ascending=True): comp_data = dict(new_gens_lv.loc[id, :]) try: nearest_substation, _ = find_nearest_bus(comp_data["geom"], substations) comp_data["mvlv_subst_id"] = int(nearest_substation.split("_")[-2]) except AttributeError: pass edisgo_object.topology.connect_to_lv( edisgo_object, comp_data, allowed_number_of_comp_per_bus=allowed_number_of_comp_per_lv_bus, ) def scale_generators(gen_type, total_capacity): idx = edisgo_object.topology.generators_df["type"] == gen_type current_capacity = edisgo_object.topology.generators_df[idx].p_nom.sum() if current_capacity != 0: edisgo_object.topology.generators_df.loc[idx, "p_nom"] *= ( total_capacity / current_capacity ) if p_target is not None: for gen_type, target_cap in p_target.items(): scale_generators(gen_type, target_cap) log_geno_count = len(new_gens_lv) log_geno_cap = new_gens_lv["p_nom"].sum() logger.debug( "{} new LV generators added ({} MW).".format( log_geno_count, round(log_geno_cap, 1) ) ) for lv_grid in edisgo_object.topology.mv_grid.lv_grids: lv_loads = len(lv_grid.loads_df) lv_gens_voltage_level_7 = len( lv_grid.generators_df[lv_grid.generators_df.bus != lv_grid.station.index[0]] ) # warn if there are more generators than loads in LV grid if lv_gens_voltage_level_7 > lv_loads * 2: logger.debug( "There are {} generators (voltage level 7) but only {} " "loads in LV grid {}.".format( lv_gens_voltage_level_7, lv_loads, lv_grid.id ) )
[docs] def oedb( edisgo_object: EDisGo, scenario: str, engine: Engine, max_capacity=20, ): """ Gets generator park for specified scenario from oedb and integrates generators into the grid. The data is imported from the tables supply.egon_chp_plants, supply.egon_power_plants and supply.egon_power_plants_pv_roof_building. For the grid integration it is distinguished between PV rooftop plants and all other power plants. For PV rooftop the following steps are conducted: * Removes decommissioned PV rooftop plants (plants whose source ID cannot be matched to a source ID of an existing plant). * Updates existing PV rooftop plants. The following two cases are distinguished: * Nominal power increases: It is checked, if plant needs to be connected to a higher voltage level and if that is the case, the existing plant is removed from the grid and the new one integrated based on the geolocation. * Nominal power decreases: Nominal power of existing plant is overwritten. * Integrates new PV rooftop plants at corresponding building ID. If the plant needs to be connected to a higher voltage level than the building, it is integrated based on the geolocation. For all other power plants the following steps are conducted: * Removes decommissioned power and CHP plants (all plants that do not have a source ID or whose source ID can not be matched to a new plant and are not of subtype pv_rooftop, as these are handled in a separate function) * Updates existing power plants (plants whose source ID can be matched; solar, wind and CHP plants never have a source ID in the future scenarios and are therefore never updated). The following two cases are distinguished: * Nominal power increases: It is checked, if plant needs to be connected to a higher voltage level and if that is the case, the existing plant is removed from the grid and the new one integrated based on the geolocation. * Nominal power decreases: Nominal power of existing plant is overwritten. * Integrates new power and CHP plants based on the geolocation. Parameters ---------- edisgo_object : :class:`~.EDisGo` scenario : str Scenario for which to retrieve generator data. Possible options are "eGon2035" and "eGon100RE". engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>` Database engine. max_capacity : float Maximum capacity in MW of power plants to retrieve from database. In general, the generators that are retrieved from the database are selected based on the voltage level they are in. In some cases, the voltage level is not correct as it was wrongly set in the MaStR dataset. To avoid having unrealistically large generators in the grids, an upper limit is also set. Per default this is 20 MW. Notes ------ Note, that PV rooftop plants are queried using the building IDs not the MV grid ID as in egon_data buildings are mapped to a grid based on the zensus cell they are in whereas in ding0 buildings are mapped to a grid based on the geolocation. As it can happen that buildings lie outside an MV grid but within a zensus cell that is assigned to that MV grid, they are mapped differently in egon_data and ding0, and it is therefore better to query using the building IDs. """ def _get_egon_power_plants(): with session_scope_egon_data(engine) as session: srid_table = get_srid_of_db_table(session, egon_power_plants.geom) query = ( session.query( egon_power_plants.id.label("generator_id"), egon_power_plants.source_id, egon_power_plants.carrier.label("type"), egon_power_plants.el_capacity.label("p_nom"), egon_power_plants.weather_cell_id, egon_power_plants.geom, ) .filter( egon_power_plants.scenario == scenario, egon_power_plants.voltage_level >= 4, egon_power_plants.el_capacity <= max_capacity, egon_power_plants.bus_id == edisgo_object.topology.id, ) .order_by(egon_power_plants.id) ) power_plants_gdf = gpd.read_postgis( sql=query.statement, con=engine, crs=f"EPSG:{srid_table}" ).to_crs(srid_edisgo) # rename wind_onshore to wind power_plants_gdf["type"] = power_plants_gdf["type"].str.replace("_onshore", "") # set subtype mapping = { "wind": "wind_onshore", "solar": "solar_ground_mounted", } power_plants_gdf = power_plants_gdf.assign( subtype=power_plants_gdf["type"].map(mapping) ) # unwrap source ID if not power_plants_gdf.empty: power_plants_gdf["source_id"] = power_plants_gdf.apply( lambda _: ( list(_["source_id"].values())[0] if isinstance(_["source_id"], dict) else None ), axis=1, ) return power_plants_gdf def _get_egon_pv_rooftop(): # egon_power_plants_pv_roof_building - queried using building IDs instead of # grid ID because it can happen that buildings lie outside an MV grid but within # a zensus cell that is assigned to that MV grid and are therefore sometimes # mapped to the MV grid they lie within and sometimes to the MV grid the zensus # cell is mapped to building_ids = edisgo_object.topology.loads_df.building_id.unique() with session_scope_egon_data(engine) as session: query = ( session.query( egon_power_plants_pv_roof_building.index.label("generator_id"), egon_power_plants_pv_roof_building.building_id, egon_power_plants_pv_roof_building.gens_id.label("source_id"), egon_power_plants_pv_roof_building.capacity.label("p_nom"), egon_power_plants_pv_roof_building.weather_cell_id, ) .filter( egon_power_plants_pv_roof_building.scenario == scenario, egon_power_plants_pv_roof_building.building_id.in_(building_ids), egon_power_plants_pv_roof_building.voltage_level >= 4, egon_power_plants_pv_roof_building.capacity <= max_capacity, ) .order_by(egon_power_plants_pv_roof_building.index) ) pv_roof_df = pd.read_sql(sql=query.statement, con=engine) # add type and subtype pv_roof_df = pv_roof_df.assign( type="solar", subtype="pv_rooftop", ) return pv_roof_df def _get_egon_chp_plants(): with session_scope_egon_data(engine) as session: srid_table = get_srid_of_db_table(session, egon_chp_plants.geom) query = ( session.query( egon_chp_plants.id.label("generator_id"), egon_chp_plants.carrier.label("type"), egon_chp_plants.district_heating_area_id.label( "district_heating_id" ), egon_chp_plants.el_capacity.label("p_nom"), egon_chp_plants.th_capacity.label("p_nom_th"), egon_chp_plants.geom, ) .filter( egon_chp_plants.scenario == scenario, egon_chp_plants.voltage_level >= 4, egon_chp_plants.el_capacity <= max_capacity, egon_chp_plants.electrical_bus_id == edisgo_object.topology.id, ) .order_by(egon_chp_plants.id) ) chp_gdf = gpd.read_postgis( sql=query.statement, con=query.session.bind, crs=f"EPSG:{srid_table}" ).to_crs(srid_edisgo) return chp_gdf saio.register_schema("supply", engine) from saio.supply import ( egon_chp_plants, egon_power_plants, egon_power_plants_pv_roof_building, ) # get generator data from database srid_edisgo = edisgo_object.topology.grid_district["srid"] pv_rooftop_df = _get_egon_pv_rooftop() power_plants_gdf = _get_egon_power_plants() chp_gdf = _get_egon_chp_plants() # determine number of generators and installed capacity in future scenario # for validation of grid integration total_p_nom_scenario = ( pv_rooftop_df.p_nom.sum() + power_plants_gdf.p_nom.sum() + chp_gdf.p_nom.sum() ) total_gen_count_scenario = len(pv_rooftop_df) + len(power_plants_gdf) + len(chp_gdf) # integrate into grid (including removal of decommissioned plants and update of # still existing power plants) _integrate_pv_rooftop(edisgo_object, pv_rooftop_df) _integrate_power_and_chp_plants(edisgo_object, power_plants_gdf, chp_gdf) # check number of generators and installed capacity in grid gens_in_grid = edisgo_object.topology.generators_df if not len(gens_in_grid) == total_gen_count_scenario: raise ValueError( f"Number of power plants in future scenario is not correct. Should be " f"{total_gen_count_scenario} instead of {len(gens_in_grid)}." ) if not np.isclose(gens_in_grid.p_nom.sum(), total_p_nom_scenario, atol=1e-4): raise ValueError( f"Capacity of power plants in future scenario not correct. Should be " f"{total_p_nom_scenario} instead of " f"{gens_in_grid.p_nom.sum()}." ) return
def _integrate_pv_rooftop(edisgo_object, pv_rooftop_df): """ This function updates generator park for PV rooftop plants. See function :func:`~.io.generators_import.oedb` for more information. Parameters ---------- edisgo_object : :class:`~.EDisGo` pv_rooftop_df : :pandas:`pandas.DataFrame<DataFrame>` Dataframe containing data on PV rooftop plants per building. Columns are: * p_nom : float Nominal power in MW. * building_id : int Building ID of the building the PV plant is allocated. * generator_id : int ID of the PV plant from database. * type : str Generator type, which here is always "solar". * subtype Further specification of generator type, which here is always "pv_rooftop". * weather_cell_id : int Weather cell the PV plant is in used to obtain the potential feed-in time series. * source_id : int MaStR ID of the PV plant. """ # match building ID to existing solar generators loads_df = edisgo_object.topology.loads_df busses_building_id = ( loads_df[loads_df.type == "conventional_load"] .drop_duplicates(subset=["building_id"]) .set_index("bus") .loc[:, ["building_id"]] ) gens_df = edisgo_object.topology.generators_df[ edisgo_object.topology.generators_df.subtype == "pv_rooftop" ].copy() gens_df_building_id = gens_df.loc[:, ["bus"]].join( busses_building_id, how="left", on="bus" ) # using update to make sure to not overwrite existing building ID information if "building_id" not in gens_df.columns: gens_df["building_id"] = None gens_df.update(gens_df_building_id, overwrite=False) # remove decommissioned PV rooftop plants gens_decommissioned = gens_df[ ~gens_df.source_id.isin(pv_rooftop_df.source_id.unique()) ] for gen in gens_decommissioned.index: edisgo_object.remove_component(comp_type="generator", comp_name=gen) # update existing PV rooftop plants gens_existing = gens_df[gens_df.source_id.isin(pv_rooftop_df.source_id.unique())] # merge new information gens_existing.index.name = "gen_name" pv_rooftop_df.index.name = "gen_index_new" gens_existing = pd.merge( gens_existing.reset_index(), pv_rooftop_df.reset_index(), how="left", on="source_id", suffixes=("_old", ""), ).set_index("gen_name") # add building id edisgo_object.topology.generators_df.loc[ gens_existing.index, "building_id" ] = gens_existing.building_id # update plants where capacity decreased gens_decreased_cap = gens_existing.query("p_nom < p_nom_old") if len(gens_decreased_cap) > 0: edisgo_object.topology.generators_df.loc[ gens_decreased_cap.index, "p_nom" ] = gens_decreased_cap.p_nom # update plants where capacity increased gens_increased_cap = gens_existing.query("p_nom > p_nom_old") for gen in gens_increased_cap.index: voltage_level_new = determine_grid_integration_voltage_level( edisgo_object, gens_increased_cap.at[gen, "p_nom"] ) voltage_level_old = determine_bus_voltage_level( edisgo_object, gens_increased_cap.at[gen, "bus"] ) if voltage_level_new >= voltage_level_old: # simply update p_nom if plant doesn't need to be connected to higher # voltage level edisgo_object.topology.generators_df.at[ gen, "p_nom" ] = gens_increased_cap.at[gen, "p_nom"] else: # if plant needs to be connected to higher voltage level, remove existing # plant and integrate new component based on geolocation bus = gens_increased_cap.at[gen, "bus"] x_coord = edisgo_object.topology.buses_df.at[bus, "x"] y_coord = edisgo_object.topology.buses_df.at[bus, "y"] edisgo_object.remove_component(comp_type="generator", comp_name=gen) edisgo_object.integrate_component_based_on_geolocation( comp_type="generator", voltage_level=voltage_level_new, geolocation=( x_coord, y_coord, ), add_ts=False, generator_id=gens_increased_cap.at[gen, "generator_id"], p_nom=gens_increased_cap.at[gen, "p_nom"], building_id=gens_increased_cap.at[gen, "building_id"], generator_type=gens_increased_cap.at[gen, "type"], subtype=gens_increased_cap.at[gen, "subtype"], weather_cell_id=gens_increased_cap.at[gen, "weather_cell_id"], source_id=gens_increased_cap.at[gen, "source_id"], ) # integrate new PV rooftop plants into grid new_pv_rooftop_plants = pv_rooftop_df[ ~pv_rooftop_df.index.isin(gens_existing.gen_index_new) ] if len(new_pv_rooftop_plants) > 0: _, new_pv_own_grid_conn = _integrate_new_pv_rooftop_to_buildings( edisgo_object, new_pv_rooftop_plants ) else: new_pv_own_grid_conn = [] # check number and installed capacity of PV rooftop plants in grid pv_rooftop_gens_in_grid = edisgo_object.topology.generators_df[ edisgo_object.topology.generators_df.subtype == "pv_rooftop" ] if not len(pv_rooftop_gens_in_grid) == len(pv_rooftop_df): raise ValueError( f"Number of PV rooftop plants in future scenario is not correct. Should be " f"{len(pv_rooftop_df)} instead of {len(pv_rooftop_gens_in_grid)}." ) if not np.isclose( pv_rooftop_gens_in_grid.p_nom.sum(), pv_rooftop_df.p_nom.sum(), atol=1e-4 ): raise ValueError( f"Capacity of PV rooftop plants in future scenario is not correct. Should " f"be {pv_rooftop_df.p_nom.sum()} instead of " f"{pv_rooftop_gens_in_grid.p_nom.sum()}." ) # logging messages logger.debug( f"{pv_rooftop_gens_in_grid.p_nom.sum():.2f} MW of PV rooftop plants " f"integrated. Of this, {gens_existing.p_nom.sum():.2f} MW could be matched to " f"an existing PV rooftop plant." ) if len(new_pv_own_grid_conn) > 0: logger.debug( f"Of the PV rooftop plants that could not be matched to an existing PV " f"plant, " f"{sum(pv_rooftop_gens_in_grid.loc[new_pv_own_grid_conn, 'p_nom']):.2f} " f"MW was integrated at a new bus." ) def _integrate_new_pv_rooftop_to_buildings(edisgo_object, pv_rooftop_df): """ Integrates new PV rooftop plants based on corresponding building ID. Parameters ---------- edisgo_object : :class:`~.EDisGo` pv_rooftop_df : :pandas:`pandas.DataFrame<DataFrame>` See :attr:`~.io.generators_import._integrate_pv_rooftop` for more information. Returns ------- (list(str), list(str)) Two lists with names (as in index of :attr:`~.network.topology.Topology.generators_df`) of all integrated PV rooftop plants and PV rooftop plants integrated to a different grid connection point than the building. """ # join busses corresponding to building ID loads_df = edisgo_object.topology.loads_df building_id_busses = ( loads_df[loads_df.type == "conventional_load"] .drop_duplicates(subset=["building_id"]) .set_index("building_id") .loc[:, ["bus"]] ) pv_rooftop_df = pv_rooftop_df.join(building_id_busses, how="left", on="building_id") # add further information needed in generators_df pv_rooftop_df["control"] = "PQ" # add generator name as index pv_rooftop_df["index"] = pv_rooftop_df.apply( lambda _: f"Generator_pv_rooftop_{_.building_id}", axis=1 ) pv_rooftop_df.set_index("index", drop=True, inplace=True) # add voltage level for gen in pv_rooftop_df.index: pv_rooftop_df.at[ gen, "voltage_level" ] = determine_grid_integration_voltage_level( edisgo_object, pv_rooftop_df.at[gen, "p_nom"] ) # check for duplicated generator names and choose random name for duplicates tmp = pv_rooftop_df.index.append(edisgo_object.topology.storage_units_df.index) duplicated_indices = tmp[tmp.duplicated()] for duplicate in duplicated_indices: # find unique name random.seed(a=duplicate) new_name = duplicate while new_name in tmp: new_name = f"{duplicate}_{random.randint(10 ** 1, 10 ** 2)}" # change name in pv_rooftop_df pv_rooftop_df.rename(index={duplicate: new_name}, inplace=True) # filter PV plants that are too large to be integrated into LV pv_rooftop_large = pv_rooftop_df[pv_rooftop_df.voltage_level < 7] pv_rooftop_small = pv_rooftop_df[pv_rooftop_df.voltage_level == 7] # integrate small batteries at buildings cols = [ "bus", "control", "p_nom", "weather_cell_id", "building_id", "type", "subtype", "source_id", ] edisgo_object.topology.generators_df = pd.concat( [edisgo_object.topology.generators_df, pv_rooftop_small.loc[:, cols]] ) integrated_plants = pv_rooftop_small.index # integrate larger PV rooftop plants - if load is already connected to # higher voltage level it can be integrated at same bus, otherwise it is # integrated based on geolocation integrated_plants_own_grid_conn = pd.Index([]) for pv_pp in pv_rooftop_large.index: # check if building is already connected to a voltage level equal to or # higher than the voltage level the PV plant should be connected to bus = pv_rooftop_large.at[pv_pp, "bus"] voltage_level_bus = determine_bus_voltage_level(edisgo_object, bus) voltage_level_pv = pv_rooftop_large.at[pv_pp, "voltage_level"] if voltage_level_pv >= voltage_level_bus: # integrate at same bus as load edisgo_object.topology.generators_df = pd.concat( [ edisgo_object.topology.generators_df, pv_rooftop_large.loc[[pv_pp], cols], ] ) integrated_plants = integrated_plants.append(pd.Index([pv_pp])) else: # integrate based on geolocation pv_pp_name = edisgo_object.integrate_component_based_on_geolocation( comp_type="generator", voltage_level=voltage_level_pv, geolocation=( edisgo_object.topology.buses_df.at[bus, "x"], edisgo_object.topology.buses_df.at[bus, "y"], ), add_ts=False, generator_id=pv_rooftop_large.at[pv_pp, "generator_id"], p_nom=pv_rooftop_large.at[pv_pp, "p_nom"], building_id=pv_rooftop_large.at[pv_pp, "building_id"], generator_type=pv_rooftop_large.at[pv_pp, "type"], subtype=pv_rooftop_large.at[pv_pp, "subtype"], weather_cell_id=pv_rooftop_large.at[pv_pp, "weather_cell_id"], source_id=pv_rooftop_large.at[pv_pp, "source_id"], ) integrated_plants = integrated_plants.append(pd.Index([pv_pp_name])) integrated_plants_own_grid_conn = integrated_plants_own_grid_conn.append( pd.Index([pv_pp_name]) ) # check if all PV plants were integrated if not len(pv_rooftop_df) == len(integrated_plants): raise ValueError("Not all PV rooftop plants could be integrated into the grid.") return integrated_plants, integrated_plants_own_grid_conn def _integrate_power_and_chp_plants(edisgo_object, power_plants_gdf, chp_gdf): """ This function updates generator park for all power plants except PV rooftop. See function :func:`~.io.generators_import.oedb` for more information. Parameters ---------- edisgo_object : :class:`~.EDisGo` power_plants_gdf : :geopandas:`geopandas.GeoDataFrame<GeoDataFrame>` Dataframe containing data on power plants. Columns are: * p_nom : float Nominal power in MW. * generator_id : int ID of the power plant from database. * type : str Generator type, e.g. "wind". * subtype Further specification of generator type, e.g. "wind_onshore". * weather_cell_id : int Weather cell the power plant is in used to obtain the potential feed-in time series. Only given for solar and wind generators. * source_id : int MaStR ID of the power plant. * geom : geometry Geolocation of power plant. chp_gdf : :geopandas:`geopandas.GeoDataFrame<GeoDataFrame>` Dataframe containing data on CHP plants. Columns are: * p_nom : float Nominal power in MW. * p_nom_th : float Thermal nominal power in MW. * generator_id : int ID of the CHP plant from database. * type : str Generator type, e.g. "gas". * district_heating_id : int ID of district heating network the CHP plant is in. * geom : geometry Geolocation of power plant. """ def _integrate_new_chp_plant(edisgo_object, comp_data): edisgo_object.integrate_component_based_on_geolocation( comp_type="generator", generator_id=comp_data.at["generator_id"], geolocation=comp_data.at["geom"], add_ts=False, p_nom=comp_data.at["p_nom"], p_nom_th=comp_data.at["p_nom_th"], generator_type=comp_data.at["type"], district_heating_id=comp_data.at["district_heating_id"], ) def _integrate_new_power_plant(edisgo_object, comp_data): edisgo_object.integrate_component_based_on_geolocation( comp_type="generator", generator_id=comp_data.at["generator_id"], geolocation=comp_data.at["geom"], add_ts=False, p_nom=comp_data.at["p_nom"], generator_type=comp_data.at["type"], subtype=comp_data.at["subtype"], weather_cell_id=comp_data.at["weather_cell_id"], source_id=comp_data.at["source_id"], ) # determine number of generators and installed capacity in future scenario # for validation of grid integration total_p_nom_scenario = power_plants_gdf.p_nom.sum() + chp_gdf.p_nom.sum() total_gen_count_scenario = len(power_plants_gdf) + len(chp_gdf) # remove all power plants that are not PV rooftop and do not have a source ID gens_df = edisgo_object.topology.generators_df[ edisgo_object.topology.generators_df.subtype != "pv_rooftop" ].copy() if "source_id" not in gens_df.columns: gens_df["source_id"] = None gens_decommissioned = gens_df[gens_df.source_id.isna()] for gen in gens_decommissioned.index: edisgo_object.remove_component(comp_type="generator", comp_name=gen) # try matching power plants with source ID, to update power plants that exist in # status quo and future scenario existing_gens_with_source = gens_df[~gens_df.source_id.isna()] if len(existing_gens_with_source) > 0: # join dataframes at source ID existing_gens_with_source.index.name = "gen_name" power_plants_gdf.index.name = "gen_index_new" existing_gens_with_source_matched = pd.merge( existing_gens_with_source.reset_index(), power_plants_gdf.reset_index(), how="inner", on="source_id", suffixes=("_old", ""), ).set_index("gen_name") # remove existing gens where source ID could not be matched existing_gens_without_source_matched = [ _ for _ in existing_gens_with_source.index if _ not in existing_gens_with_source_matched.index ] for gen in existing_gens_without_source_matched: edisgo_object.remove_component(comp_type="generator", comp_name=gen) # where source ID could be matched, check if capacity increased or decreased # update plants where capacity decreased gens_decreased_cap = existing_gens_with_source_matched.query( "p_nom < p_nom_old" ) if len(gens_decreased_cap) > 0: edisgo_object.topology.generators_df.loc[ gens_decreased_cap.index, "p_nom" ] = gens_decreased_cap.p_nom # update plants where capacity increased gens_increased_cap = existing_gens_with_source_matched.query( "p_nom > p_nom_old" ) for gen in gens_increased_cap.index: voltage_level_new = determine_grid_integration_voltage_level( edisgo_object, gens_increased_cap.at[gen, "p_nom"] ) voltage_level_old = determine_bus_voltage_level( edisgo_object, gens_increased_cap.at[gen, "bus"] ) if voltage_level_new >= voltage_level_old: # simply update p_nom if plant doesn't need to be connected to higher # voltage level edisgo_object.topology.generators_df.at[ gen, "p_nom" ] = gens_increased_cap.at[gen, "p_nom"] else: # if plant needs to be connected to higher voltage level, remove # existing plant and integrate new component based on geolocation edisgo_object.remove_component(comp_type="generator", comp_name=gen) _integrate_new_power_plant(edisgo_object, gens_increased_cap.loc[gen]) else: existing_gens_with_source_matched = pd.DataFrame( columns=["gen_index_new", "p_nom"] ) # gens where source ID could not be matched are all new new_power_plants = power_plants_gdf[ ~power_plants_gdf.index.isin(existing_gens_with_source_matched.gen_index_new) ] for gen in new_power_plants.index: _integrate_new_power_plant(edisgo_object, new_power_plants.loc[gen]) # add all CHP plants based on geolocation for gen in chp_gdf.index: _integrate_new_chp_plant(edisgo_object, chp_gdf.loc[gen]) # check number of power and CHP plants in grid as well as installed capacity gens_in_grid = edisgo_object.topology.generators_df[ edisgo_object.topology.generators_df.subtype != "pv_rooftop" ] if not len(gens_in_grid) == total_gen_count_scenario: raise ValueError( f"Number of power plants in future scenario is not correct. Should be " f"{total_gen_count_scenario} instead of {len(gens_in_grid)}." ) if not np.isclose(gens_in_grid.p_nom.sum(), total_p_nom_scenario, atol=1e-4): raise ValueError( f"Capacity of power plants in future scenario not correct. Should be " f"{total_p_nom_scenario} instead of " f"{gens_in_grid.p_nom.sum()}." ) # logging messages cap_matched = existing_gens_with_source_matched.p_nom.sum() logger.debug( f"{total_p_nom_scenario:.2f} MW of power and CHP plants integrated. Of this, " f"{cap_matched:.2f} MW could be matched to existing power plants." )