from __future__ import annotations
import json
import logging
import os
from collections import Counter
from pathlib import Path, PurePath
from typing import TYPE_CHECKING
import numpy as np
import pandas as pd
import saio
from numpy.random import default_rng
from sklearn import preprocessing
from sqlalchemy.engine.base import Engine
from edisgo.io.db import get_srid_of_db_table, session_scope_egon_data
if "READTHEDOCS" not in os.environ:
import geopandas as gpd
if TYPE_CHECKING:
from edisgo import EDisGo
logger = logging.getLogger(__name__)
min_max_scaler = preprocessing.MinMaxScaler()
COLUMNS = {
"integrated_charging_parks_df": ["edisgo_id"],
"charging_processes_df": [
"ags",
"car_id",
"destination",
"use_case",
"nominal_charging_capacity_kW",
"grid_charging_capacity_kW",
"chargingdemand_kWh",
"park_time_timesteps",
"park_start_timesteps",
"park_end_timesteps",
],
"simbev_config_df": [
"eta_cp",
"stepsize",
"start_date",
"end_date",
"soc_min",
"grid_timeseries",
"grid_timeseries_by_usecase",
"days",
],
"matching_demand_and_location": ["charging_park_id", "charging_point_id"],
"potential_charging_parks_gdf": [
"ags",
"use_case",
"user_centric_weight",
"geometry",
],
"available_charging_points_df": [
"park_end_timesteps",
"nominal_charging_capacity_kW",
"charging_park_id",
"use_case",
],
}
DTYPES = {
"charging_processes_df": {
"ags": np.uint32,
"car_id": np.uint32,
"destination": str,
"use_case": str,
"nominal_charging_capacity_kW": np.float64,
"grid_charging_capacity_kW": np.float64,
"chargingdemand_kWh": np.float64,
"park_time_timesteps": np.uint16,
"park_start_timesteps": np.uint16,
"park_end_timesteps": np.uint16,
},
"simbev_config_df": {
"eta_cp": float,
"stepsize": int,
"soc_min": float,
"grid_timeseries": bool,
"grid_timeseries_by_usecase": bool,
},
"potential_charging_parks_gdf": {
"ags": np.uint32,
"use_case": str,
"user_centric_weight": np.float64,
},
}
KEEP_COLS = {"potential_charging_parks_gdf": ["user_centric_weight", "geometry"]}
USECASES = ["hpc", "public", "home", "work"]
PRIVATE_DESTINATIONS = {
"0_work": "work",
"6_home": "home",
}
[docs]
def import_electromobility_from_dir(
edisgo_obj: EDisGo,
simbev_directory: PurePath | str,
tracbev_directory: PurePath | str,
**kwargs,
):
"""
Import electromobility data from
`SimBEV <https://github.com/rl-institut/simbev>`_ and
`TracBEV <https://github.com/rl-institut/tracbev>`_ from directory.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
simbev_directory : str or pathlib.PurePath
SimBEV directory holding SimBEV data.
tracbev_directory : str or pathlib.PurePath
TracBEV directory holding TracBEV data.
kwargs :
Kwargs may contain any further attributes you want to specify.
gc_to_car_rate_home : float
Specifies the minimum rate between potential charging parks
points for the use case "home" and the total number of cars.
Default 0.5 .
gc_to_car_rate_work : float
Specifies the minimum rate between potential charging parks
points for the use case "work" and the total number of cars.
Default 0.25 .
gc_to_car_rate_public : float
Specifies the minimum rate between potential charging parks
points for the use case "public" and the total number of cars.
Default 0.1 .
gc_to_car_rate_hpc : float
Specifies the minimum rate between potential charging parks
points for the use case "hpc" and the total number of cars.
Default 0.005 .
mode_parking_times : str
If the mode_parking_times is set to "frugal" only parking times
with any charging demand are imported. Default "frugal".
charging_processes_dir : str
Charging processes sub-directory. Default None.
simbev_config_file : str
Name of the simbev config file. Default "metadata_simbev_run.json".
"""
# TODO: SimBEV is in development and this import will need constant
# updating for now
edisgo_obj.electromobility.charging_processes_df = read_csvs_charging_processes(
simbev_directory,
mode=kwargs.pop("mode_parking_times", "frugal"),
csv_dir=kwargs.pop("charging_processes_dir", None),
)
edisgo_obj.electromobility.simbev_config_df = read_simbev_config_df(
simbev_directory,
edisgo_obj,
simbev_config_file=kwargs.pop("simbev_config_file", "metadata_simbev_run.json"),
)
potential_charging_parks_gdf = read_gpkg_potential_charging_parks(
tracbev_directory,
edisgo_obj,
)
edisgo_obj.electromobility.potential_charging_parks_gdf = (
assure_minimum_potential_charging_parks(
edisgo_obj=edisgo_obj,
potential_charging_parks_gdf=potential_charging_parks_gdf,
**kwargs,
)
)
[docs]
def read_csvs_charging_processes(csv_path, mode="frugal", csv_dir=None):
"""
Reads all CSVs in a given path and returns a DataFrame with all
`SimBEV <https://github.com/rl-institut/simbev>`_ charging processes.
Parameters
----------
csv_path : str
Main path holding SimBEV output data
mode : str
Returns all information if None. Returns only rows with charging
demand greater than 0 if "frugal". Default: "frugal".
csv_dir : str
Optional sub-directory holding charging processes CSVs under path.
Default: None.
Returns
-------
:pandas:`pandas.DataFrame<DataFrame>`
DataFrame with AGS, car ID, trip destination, charging use case
(private or public), netto charging capacity, charging demand,
charge start, charge end, potential charging park ID and charging point
ID.
"""
if csv_dir is not None:
csv_path = os.path.join(csv_path, csv_dir)
files = []
for dirpath, dirnames, filenames in os.walk(csv_path):
files.extend(
Path(os.path.join(dirpath, f)) for f in filenames if f.endswith(".csv")
)
if not files:
raise ValueError(f"Couldn't find any CSVs in path {csv_path}.")
files.sort()
# wrapper function for csv files read in with map_except function
def rd_csv(file):
ags = int(file[1].parts[-2])
car_id = file[0]
try:
return pd.read_csv(file[1]).assign(ags=ags, car_id=car_id)
except Exception:
logger.warning(f"File '{file[1]}' couldn't be read and is skipped.")
return pd.DataFrame()
df = pd.concat(map(rd_csv, list(enumerate(files))), ignore_index=True)
if mode == "frugal":
df = df.loc[df.chargingdemand_kWh > 0]
df = df.rename(columns={"location": "destination"})
df = df[COLUMNS["charging_processes_df"]].astype(DTYPES["charging_processes_df"])
return pd.merge(
df,
pd.DataFrame(columns=COLUMNS["matching_demand_and_location"]),
how="outer",
left_index=True,
right_index=True,
)
[docs]
def read_simbev_config_df(
path, edisgo_obj, simbev_config_file="metadata_simbev_run.json"
):
"""
Get `SimBEV <https://github.com/rl-institut/simbev>`_ config data.
Parameters
----------
path : str
Main path holding SimBEV output data.
edisgo_obj : :class:`~.EDisGo`
simbev_config_file : str
SimBEV config file name. Default: "metadata_simbev_run.json".
Returns
-------
:pandas:`pandas.DataFrame<DataFrame>`
DataFrame with used random seed, used threads, stepsize in minutes,
year, scenarette, simulated days, maximum number of cars per AGS,
completed standing times and time series per AGS and used ramp up
data CSV.
"""
try:
if simbev_config_file is not None:
with open(os.path.join(path, simbev_config_file)) as f:
data = json.load(f)
df = pd.DataFrame.from_dict(
data["config"]["basic"], orient="index"
).T.astype(DTYPES["simbev_config_df"])
for col in ["start_date", "end_date"]:
df[col] = pd.to_datetime(df[col])
return df.assign(days=(df.end_date - df.start_date).iat[0].days + 1)
except Exception:
logger.warning(
"SimBEV config file could not be imported. Charging point "
"efficiency is set to 100%, the stepsize is set to 15 minutes "
"and the simulated days are estimated from the charging "
"processes."
)
mx_t = edisgo_obj.electromobility.charging_processes_df.park_end_timesteps.max()
data = {
"eta_cp": [1.0],
"stepsize": [15],
"days": [np.ceil(mx_t / (4 * 24))],
}
return pd.DataFrame(data=data, index=[0])
[docs]
def read_gpkg_potential_charging_parks(path, edisgo_obj):
"""
Get GeoDataFrame with all
`TracBEV <https://github.com/rl-institut/tracbev>`_ potential charging parks.
Parameters
----------
path : str
Main path holding TracBEV data.
edisgo_obj : :class:`~.EDisGo`
Returns
-------
:geopandas:`GeoDataFrame`
GeoDataFrame with AGS, charging use case (home, work, public or
hpc), user-centric weight and geometry.
"""
files = [f for f in os.listdir(path) if f.endswith(".gpkg")]
potential_charging_parks_gdf_list = []
if isinstance(path, str):
path = Path(path)
for f in files:
gdf = gpd.read_file(path / f)
if "undefined" in gdf.crs.name.lower():
gdf = gdf.set_crs(epsg=3035, allow_override=True).to_crs(
epsg=edisgo_obj.topology.grid_district["srid"]
)
else:
gdf = gdf.to_crs(epsg=edisgo_obj.topology.grid_district["srid"])
gdf = gdf.rename(
columns={
"charge_spots": "user_centric_weight",
"potential": "user_centric_weight",
}
)
# drop unnecessary columns
gdf = gdf[KEEP_COLS["potential_charging_parks_gdf"]]
# add ags and use case info as well as normalize weights 0..1
gdf = gdf.assign(
user_centric_weight=min_max_scaler.fit_transform(
gdf.user_centric_weight.values.reshape(-1, 1)
),
ags=int(f.split(".")[0].split("_")[-1]),
use_case=f.split(".")[0].split("_")[-2],
)
potential_charging_parks_gdf_list.append(gdf)
potential_charging_parks_gdf = gpd.GeoDataFrame(
pd.concat(
potential_charging_parks_gdf_list,
ignore_index=True,
),
crs=potential_charging_parks_gdf_list[0].crs,
)
return potential_charging_parks_gdf
[docs]
def assure_minimum_potential_charging_parks(
edisgo_obj: EDisGo,
potential_charging_parks_gdf: gpd.GeoDataFrame,
**kwargs,
):
# ensure minimum number of potential charging parks per car
num_cars = len(edisgo_obj.electromobility.charging_processes_df.car_id.unique())
for use_case in USECASES:
if use_case == "home":
gc_to_car_rate = kwargs.get("gc_to_car_rate_home", 0.5)
elif use_case == "work":
gc_to_car_rate = kwargs.get("gc_to_car_rate_work", 0.25)
elif use_case == "public":
gc_to_car_rate = kwargs.get("gc_to_car_rate_public", 0.1)
elif use_case == "hpc":
gc_to_car_rate = kwargs.get("gc_to_car_rate_hpc", 0.005)
use_case_gdf = potential_charging_parks_gdf.loc[
potential_charging_parks_gdf.use_case == use_case
]
num_gcs = len(use_case_gdf)
# if tracbev doesn't provide possible grid connections choose a
# random public potential charging park and duplicate
if num_gcs == 0:
logger.warning(
f"There are no potential charging parks for use case {use_case}. "
f"Therefore 10 % of public potential charging parks are duplicated "
f"randomly and assigned to use case {use_case}."
)
public_gcs = potential_charging_parks_gdf.loc[
potential_charging_parks_gdf.use_case == "public"
]
random_gcs = public_gcs.sample(
int(np.ceil(len(public_gcs) / 10)),
random_state=edisgo_obj.topology.mv_grid.id,
).assign(use_case=use_case)
potential_charging_parks_gdf = pd.concat(
[
potential_charging_parks_gdf,
random_gcs,
],
ignore_index=True,
)
use_case_gdf = potential_charging_parks_gdf.loc[
potential_charging_parks_gdf.use_case == use_case
]
num_gcs = len(use_case_gdf)
# escape zero division
actual_gc_to_car_rate = np.Infinity if num_cars == 0 else num_gcs / num_cars
# duplicate potential charging parks until desired quantity is ensured
max_it = 50
n = 0
while actual_gc_to_car_rate < gc_to_car_rate and n < max_it:
logger.info(
f"Duplicating potential charging parks to meet the desired grid "
f"connections to cars rate of {gc_to_car_rate*100:.2f} % for use case "
f"{use_case}. Iteration: {n+1}."
)
if actual_gc_to_car_rate * 2 < gc_to_car_rate:
potential_charging_parks_gdf = pd.concat(
[
potential_charging_parks_gdf,
use_case_gdf,
],
ignore_index=True,
)
else:
extra_gcs = (
int(np.ceil(num_gcs * gc_to_car_rate / actual_gc_to_car_rate))
- num_gcs
)
extra_gdf = use_case_gdf.sample(
n=extra_gcs, random_state=edisgo_obj.topology.mv_grid.id
)
potential_charging_parks_gdf = pd.concat(
[
potential_charging_parks_gdf,
extra_gdf,
],
ignore_index=True,
)
use_case_gdf = potential_charging_parks_gdf.loc[
potential_charging_parks_gdf.use_case == use_case
]
num_gcs = len(use_case_gdf)
actual_gc_to_car_rate = num_gcs / num_cars
n += 1
# sort GeoDataFrame
potential_charging_parks_gdf = potential_charging_parks_gdf.sort_values(
by=["use_case", "ags", "user_centric_weight"], ascending=[True, True, False]
).reset_index(drop=True)
# in case of polygons use the centroid as potential charging parks point
# and set crs to match edisgo object
return (
potential_charging_parks_gdf.assign(
geometry=potential_charging_parks_gdf.geometry.representative_point()
)
.to_crs(epsg=edisgo_obj.topology.grid_district["srid"])
.astype(DTYPES["potential_charging_parks_gdf"])
)
[docs]
def distribute_charging_demand(edisgo_obj, **kwargs):
"""
Distribute charging demand from SimBEV onto potential charging parks from TracBEV.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
kwargs :
Kwargs may contain any further attributes you want to specify.
mode : str
Distribution mode. If the mode is set to "user_friendly" only the
simbev weights are used for the distribution. If the mode is
"grid_friendly" also grid conditions are respected.
Default "user_friendly".
generators_weight_factor : float
Weighting factor of the generators weight within an LV grid in
comparison to the loads weight. Default 0.5.
distance_weight : float
Weighting factor for the distance between a potential charging park
and its nearest substation in comparison to the combination of
the generators and load factors of the LV grids.
Default 1 / 3.
user_friendly_weight : float
Weighting factor of the user friendly weight in comparison to the
grid friendly weight. Default 0.5.
"""
distribute_private_charging_demand(edisgo_obj)
distribute_public_charging_demand(edisgo_obj, **kwargs)
[docs]
def get_weights_df(edisgo_obj, potential_charging_park_indices, **kwargs):
"""
Get weights per potential charging point for a given set of grid connection indices.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
potential_charging_park_indices : list
List of potential charging parks indices
Other Parameters
-----------------
mode : str
Only use user friendly weights ("user_friendly") or combine with
grid friendly weights ("grid_friendly"). Default: "user_friendly".
user_friendly_weight : float
Weight of user friendly weight if mode "grid_friendly". Default: 0.5.
distance_weight: float
Grid friendly weight is a combination of the installed capacity of
generators and loads within a LV grid and the distance towards the
nearest substation. This parameter sets the weight for the distance
parameter. Default: 1/3.
Returns
-------
:pandas:`pandas.DataFrame<DataFrame>`
DataFrame with numeric weights
"""
def _get_lv_grid_weights():
"""
DataFrame containing technical data of LV grids.
Returns
--------
:pandas:`pandas.DataFrame<DataFrame>`
Columns of the DataFrame are:
peak_generation_capacity : float
Cumulative peak generation capacity of generators in the network in
MW.
p_set : float
Cumulative peak load of loads in the network in MW.
substation_capacity : float
Cumulative capacity of transformers to overlaying network.
generators_weight : float
Weighting used in grid friendly siting of public charging points.
In the case of generators the weight is defined by dividing the
peak_generation_capacity by substation_capacity and norming the
results from 0 .. 1. A higher weight is more attractive.
loads_weight : float
Weighting used in grid friendly siting of public charging points.
In the case of loads the weight is defined by dividing the
p_set by substation_capacity and norming the results from 0 .. 1.
The result is then substracted from 1 as the higher the p_set is
in relation to the substation_capacity the less attractive this LV
grid is for new loads from a grid perspective. A higher weight is
more attractive.
"""
lv_grids = list(edisgo_obj.topology.mv_grid.lv_grids)
lv_grids_df = pd.DataFrame(
index=[_._id for _ in lv_grids],
columns=[
"peak_generation_capacity",
"substation_capacity",
"generators_weight",
"p_set",
"loads_weight",
],
)
lv_grids_df.peak_generation_capacity = [
_.peak_generation_capacity for _ in lv_grids
]
lv_grids_df.substation_capacity = [
_.transformers_df.s_nom.sum() for _ in lv_grids
]
min_max_scaler = preprocessing.MinMaxScaler()
lv_grids_df.generators_weight = lv_grids_df.peak_generation_capacity.divide(
lv_grids_df.substation_capacity
)
lv_grids_df.generators_weight = min_max_scaler.fit_transform(
lv_grids_df.generators_weight.values.reshape(-1, 1)
)
lv_grids_df.p_set = [_.p_set for _ in lv_grids]
lv_grids_df.loads_weight = lv_grids_df.p_set.divide(
lv_grids_df.substation_capacity
)
lv_grids_df.loads_weight = 1 - min_max_scaler.fit_transform(
lv_grids_df.loads_weight.values.reshape(-1, 1)
)
return lv_grids_df
mode = kwargs.get("mode", "user_friendly")
if mode == "user_friendly":
weights = [
_.user_centric_weight
for _ in edisgo_obj.electromobility.potential_charging_parks
if _.id in potential_charging_park_indices
]
elif mode == "grid_friendly":
potential_charging_parks = list(
edisgo_obj.electromobility.potential_charging_parks
)
user_friendly_weights = [
_.user_centric_weight
for _ in potential_charging_parks
if _.id in potential_charging_park_indices
]
lv_grids_df = _get_lv_grid_weights()
generators_weight_factor = kwargs.get("generators_weight_factor", 0.5)
loads_weight_factor = 1 - generators_weight_factor
combined_weights = (
generators_weight_factor * lv_grids_df["generators_weight"]
+ loads_weight_factor * lv_grids_df["loads_weight"]
)
lv_grid_ids = [
_.nearest_substation["lv_grid_id"] for _ in potential_charging_parks
]
load_and_generator_capacity_weights = [
combined_weights.at[lv_grid_id] for lv_grid_id in lv_grid_ids
]
# fmt: off
distance_weights = (
edisgo_obj.electromobility._potential_charging_parks_df.distance_weight
.tolist()
)
# fmt: on
distance_weight = kwargs.get("distance_weight", 1 / 3)
grid_friendly_weights = [
(1 - distance_weight) * load_and_generator_capacity_weights[i]
+ distance_weight * distance_weights[i]
for i in range(len(distance_weights))
]
user_friendly_weight = kwargs.get("user_friendly_weight", 0.5)
weights = [
(1 - user_friendly_weight) * grid_friendly_weights[i]
+ user_friendly_weight * user_friendly_weights[i]
for i in range(len(grid_friendly_weights))
]
else:
raise ValueError(
"Provided mode is not valid, needs to be 'user_friendly' or "
"'grid_friendly'."
)
return pd.DataFrame(weights)
[docs]
def normalize(weights_df):
"""
Normalize a given DataFrame so that its sum equals 1 and return a
flattened Array.
Parameters
----------
weights_df : :pandas:`pandas.DataFrame<DataFrame>`
DataFrame with single numeric column
Returns
-------
Numpy 1-D array
Array with normalized weights
"""
if weights_df.sum().sum() == 0:
return np.array([1 / len(weights_df) for _ in range(len(weights_df))])
else:
return weights_df.divide(weights_df.sum().sum()).T.to_numpy().flatten()
[docs]
def combine_weights(
potential_charging_park_indices, designated_charging_point_capacity_df, weights_df
):
"""
Add designated charging capacity weights into the initial weights and
normalize weights
Parameters
----------
potential_charging_park_indices : list
List of potential charging parks indices
designated_charging_point_capacity_df :
:pandas:`pandas.DataFrame<DataFrame>`
DataFrame with designated charging point capacity per potential
charging park
weights_df : :pandas:`pandas.DataFrame<DataFrame>`
DataFrame with initial user or combined weights
Returns
-------
Numpy 1-D array
Array with normalized weights
"""
capacity_df = designated_charging_point_capacity_df.loc[
potential_charging_park_indices
]
capacity_weights = (
1
- min_max_scaler.fit_transform(
capacity_df.designated_charging_point_capacity.values.reshape(-1, 1)
)
).flatten()
user_df = weights_df.loc[potential_charging_park_indices]
user_df[0] += capacity_weights
return normalize(user_df)
[docs]
def weighted_random_choice(
edisgo_obj,
potential_charging_park_indices,
car_id,
destination,
charging_point_id,
normalized_weights,
rng=None,
):
"""
Weighted random choice of a potential charging park. Setting the chosen
values into :obj:`~.network.electromobility.Electromobility.charging_processes_df`
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
potential_charging_park_indices : list
List of potential charging parks indices
car_id : int
Car ID
destination : str
Trip destination
charging_point_id : int
Charging Point ID
normalized_weights : Numpy 1-D array
Array with normalized weights
rng : Numpy random generator
If None a random generator with seed=charging_point_id is
initialized
Returns
-------
:obj:`int`
Chosen Charging Park ID
"""
if rng is None:
rng = default_rng(seed=charging_point_id)
charging_park_id = rng.choice(
a=potential_charging_park_indices,
p=normalized_weights,
)
edisgo_obj.electromobility.charging_processes_df.loc[
(edisgo_obj.electromobility.charging_processes_df.car_id == car_id)
& (edisgo_obj.electromobility.charging_processes_df.destination == destination)
] = edisgo_obj.electromobility.charging_processes_df.loc[
(edisgo_obj.electromobility.charging_processes_df.car_id == car_id)
& (edisgo_obj.electromobility.charging_processes_df.destination == destination)
].assign(
charging_park_id=charging_park_id,
charging_point_id=charging_point_id,
)
return charging_park_id
[docs]
def distribute_private_charging_demand(edisgo_obj):
"""
Distributes all private charging processes. Each car gets its own
private charging point if a charging process takes place.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
"""
try:
rng = default_rng(seed=edisgo_obj.topology.id)
except Exception:
rng = None
private_charging_df = edisgo_obj.electromobility.charging_processes_df.loc[
(edisgo_obj.electromobility.charging_processes_df.chargingdemand_kWh > 0)
& edisgo_obj.electromobility.charging_processes_df.use_case.isin(
["home", "work"]
)
]
charging_point_id = 0
user_centric_weights_df = get_weights_df(
edisgo_obj, edisgo_obj.electromobility.potential_charging_parks_gdf.index
)
designated_charging_point_capacity_df = pd.DataFrame(
index=user_centric_weights_df.index,
columns=["designated_charging_point_capacity"],
data=0,
)
for destination in private_charging_df.destination.sort_values().unique():
private_charging_destination_df = private_charging_df.loc[
private_charging_df.destination == destination
]
use_case = PRIVATE_DESTINATIONS[destination]
if use_case == "work":
potential_charging_park_indices = (
edisgo_obj.electromobility.potential_charging_parks_gdf.loc[
edisgo_obj.electromobility.potential_charging_parks_gdf.use_case
== use_case
].index
)
for car_id in private_charging_destination_df.car_id.sort_values().unique():
weights = combine_weights(
potential_charging_park_indices,
designated_charging_point_capacity_df,
user_centric_weights_df,
)
charging_park_id = weighted_random_choice(
edisgo_obj,
potential_charging_park_indices,
car_id,
destination,
charging_point_id,
weights,
rng=rng,
)
charging_capacity = (
private_charging_destination_df.loc[
(private_charging_destination_df.car_id == car_id)
& (private_charging_destination_df.destination == "0_work")
].nominal_charging_capacity_kW.iat[0]
/ edisgo_obj.electromobility.eta_charging_points
)
designated_charging_point_capacity_df.at[
charging_park_id, "designated_charging_point_capacity"
] += charging_capacity
charging_point_id += 1
elif use_case == "home":
for ags in private_charging_destination_df.ags.sort_values().unique():
private_charging_ags_df = private_charging_destination_df.loc[
private_charging_destination_df.ags == ags
]
# fmt: off
potential_charging_park_indices = edisgo_obj.electromobility.\
potential_charging_parks_gdf.loc[
(
edisgo_obj.electromobility.potential_charging_parks_gdf.ags
== ags
)
& (
edisgo_obj.electromobility.potential_charging_parks_gdf.
use_case == use_case
)
].index
# fmt: on
for car_id in private_charging_ags_df.car_id.sort_values().unique():
weights = combine_weights(
potential_charging_park_indices,
designated_charging_point_capacity_df,
user_centric_weights_df,
)
weighted_random_choice(
edisgo_obj,
potential_charging_park_indices,
car_id,
destination,
charging_point_id,
weights,
rng=rng,
)
charging_capacity = private_charging_destination_df.loc[
(private_charging_destination_df.car_id == car_id)
& (private_charging_destination_df.destination == "6_home")
].nominal_charging_capacity_kW.iat[0]
designated_charging_point_capacity_df.at[
charging_park_id, "designated_charging_point_capacity"
] += charging_capacity
charging_point_id += 1
else:
raise ValueError(f"Destination {destination} is unknown.")
[docs]
def distribute_public_charging_demand(edisgo_obj, **kwargs):
"""
Distributes all public charging processes. For each process it is
checked if a matching charging point exists to minimize the
number of charging points.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
"""
public_charging_df = edisgo_obj.electromobility.charging_processes_df.loc[
(edisgo_obj.electromobility.charging_processes_df.chargingdemand_kWh > 0)
& edisgo_obj.electromobility.charging_processes_df.use_case.isin(
["public", "hpc"]
)
].sort_values(
by=["park_start_timesteps", "park_end_timesteps"],
ascending=[True, True],
)
try:
rng = default_rng(seed=edisgo_obj.topology.id)
except Exception:
rng = default_rng(seed=1)
available_charging_points_df = pd.DataFrame(
columns=COLUMNS["available_charging_points_df"]
)
grid_and_user_centric_weights_df = get_weights_df(
edisgo_obj,
edisgo_obj.electromobility.potential_charging_parks_gdf.index,
**kwargs,
)
designated_charging_point_capacity_df = pd.DataFrame(
index=grid_and_user_centric_weights_df.index,
columns=["designated_charging_point_capacity"],
data=0,
)
columns = [
"destination",
"use_case",
"park_start_timesteps",
"park_end_timesteps",
"nominal_charging_capacity_kW",
]
for (
idx,
destination,
use_case,
park_start_timesteps,
park_end_timesteps,
nominal_charging_capacity_kW,
) in public_charging_df[columns].itertuples():
matching_charging_points_df = available_charging_points_df.loc[
(available_charging_points_df.park_end_timesteps < park_start_timesteps)
& (
available_charging_points_df.nominal_charging_capacity_kW.round(1)
== round(nominal_charging_capacity_kW, 1)
)
]
if len(matching_charging_points_df) > 0:
potential_charging_park_indices = matching_charging_points_df.index
weights = normalize(
grid_and_user_centric_weights_df.loc[
matching_charging_points_df.charging_park_id
]
)
charging_point_s = matching_charging_points_df.loc[
rng.choice(a=potential_charging_park_indices, p=weights)
]
edisgo_obj.electromobility.charging_processes_df.at[
idx, "charging_park_id"
] = charging_point_s["charging_park_id"]
edisgo_obj.electromobility.charging_processes_df.at[
idx, "charging_point_id"
] = charging_point_s.name
available_charging_points_df.at[
charging_point_s.name, "park_end_timesteps"
] = park_end_timesteps
else:
potential_charging_park_indices = (
edisgo_obj.electromobility.potential_charging_parks_gdf.loc[
(
edisgo_obj.electromobility.potential_charging_parks_gdf.use_case
== use_case
)
].index
)
weights = combine_weights(
potential_charging_park_indices,
designated_charging_point_capacity_df,
grid_and_user_centric_weights_df,
)
charging_park_id = rng.choice(
a=potential_charging_park_indices,
p=weights,
)
# fmt: off
charging_point_id = (
edisgo_obj.electromobility.charging_processes_df.charging_point_id
.max()
+ 1
)
# fmt: on
if charging_point_id != charging_point_id:
charging_point_id = 0
edisgo_obj.electromobility.charging_processes_df.at[
idx, "charging_park_id"
] = charging_park_id
edisgo_obj.electromobility.charging_processes_df.at[
idx, "charging_point_id"
] = charging_point_id
available_charging_points_df.loc[
charging_point_id
] = edisgo_obj.electromobility.charging_processes_df.loc[
idx, available_charging_points_df.columns
].tolist()
designated_charging_point_capacity_df.at[
charging_park_id, "designated_charging_point_capacity"
] += nominal_charging_capacity_kW
[docs]
def determine_grid_connection_capacity(
total_charging_point_capacity, lower_limit=0.3, upper_limit=1.0, minimum_factor=0.45
):
if total_charging_point_capacity <= lower_limit:
return total_charging_point_capacity
elif total_charging_point_capacity >= upper_limit:
return minimum_factor * total_charging_point_capacity
else:
return (
((minimum_factor - 1) / (upper_limit - lower_limit))
* (total_charging_point_capacity - lower_limit)
+ 1
) * total_charging_point_capacity
[docs]
def integrate_charging_parks(edisgo_obj):
"""
Integrates all designated charging parks into the grid.
The charging time series at each charging park are not set in this function.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
"""
charging_parks = list(edisgo_obj.electromobility.potential_charging_parks)
# Only integrate charging parks with designated charging points
designated_charging_parks = [
cp
for cp in charging_parks
if (cp.designated_charging_point_capacity > 0) and cp.within_grid
]
charging_park_ids = [_.id for _ in designated_charging_parks]
comp_type = "charging_point"
# integrate ChargingPoints and save the names of the eDisGo ID
edisgo_ids = [
edisgo_obj.integrate_component_based_on_geolocation(
comp_type=comp_type,
geolocation=cp.geometry,
sector=cp.use_case,
add_ts=False,
p_set=cp.grid_connection_capacity,
)
for cp in designated_charging_parks
]
edisgo_obj.electromobility.integrated_charging_parks_df = pd.DataFrame(
columns=COLUMNS["integrated_charging_parks_df"],
data=edisgo_ids,
index=charging_park_ids,
)
[docs]
def import_electromobility_from_oedb(
edisgo_obj: EDisGo,
scenario: str,
engine: Engine,
**kwargs,
):
"""
Gets electromobility data for specified scenario from oedb.
Electromobility data includes data on standing times, charging demand,
etc. per vehicle, as well as information on potential charging point locations.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
scenario : str
Scenario for which to retrieve electromobility data. Possible options
are 'eGon2035' and 'eGon100RE'.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Other Parameters
----------------
kwargs :
Possible options are `gc_to_car_rate_home`, `gc_to_car_rate_work`,
`gc_to_car_rate_public`, `gc_to_car_rate_hpc`, and `mode_parking_times`. See
parameter documentation of `import_electromobility_data_kwds` parameter in
:attr:`~.EDisGo.import_electromobility` for more information.
"""
edisgo_obj.electromobility.charging_processes_df = charging_processes_from_oedb(
edisgo_obj=edisgo_obj, engine=engine, scenario=scenario, **kwargs
)
edisgo_obj.electromobility.simbev_config_df = simbev_config_from_oedb(
scenario=scenario, engine=engine
)
potential_charging_parks_gdf = potential_charging_parks_from_oedb(
edisgo_obj=edisgo_obj, engine=engine, **kwargs
)
edisgo_obj.electromobility.potential_charging_parks_gdf = (
assure_minimum_potential_charging_parks(
edisgo_obj=edisgo_obj,
potential_charging_parks_gdf=potential_charging_parks_gdf,
**kwargs,
)
)
[docs]
def simbev_config_from_oedb(
scenario: str,
engine: Engine,
):
"""
Gets :attr:`~.network.electromobility.Electromobility.simbev_config_df`
for specified scenario from oedb.
Parameters
----------
scenario : str
Scenario for which to retrieve electromobility data. Possible options
are 'eGon2035' and 'eGon100RE'.
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
--------
:pandas:`pandas.DataFrame<DataFrame>`
See :attr:`~.network.electromobility.Electromobility.simbev_config_df` for
more information.
"""
saio.register_schema("demand", engine)
from saio.demand import egon_ev_metadata
with session_scope_egon_data(engine) as session:
query = session.query(egon_ev_metadata).filter(
egon_ev_metadata.scenario == scenario
)
df = pd.read_sql(sql=query.statement, con=query.session.bind)
return df.assign(days=(df.end_date - df.start_date).iat[0].days + 1)
[docs]
def potential_charging_parks_from_oedb(
edisgo_obj: EDisGo,
engine: Engine,
):
"""
Gets :attr:`~.network.electromobility.Electromobility.potential_charging_parks_gdf`
data from oedb.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
Returns
--------
:geopandas:`geopandas.GeoDataFrame<GeoDataFrame>`
See
:attr:`~.network.electromobility.Electromobility.potential_charging_parks_gdf`
for more information.
"""
saio.register_schema("grid", engine)
from saio.grid import egon_emob_charging_infrastructure
crs = edisgo_obj.topology.grid_district["srid"]
with session_scope_egon_data(engine) as session:
srid = get_srid_of_db_table(session, egon_emob_charging_infrastructure.geometry)
query = session.query(
egon_emob_charging_infrastructure.cp_id,
egon_emob_charging_infrastructure.use_case,
egon_emob_charging_infrastructure.weight.label("user_centric_weight"),
egon_emob_charging_infrastructure.geometry.label("geom"),
).filter(egon_emob_charging_infrastructure.mv_grid_id == edisgo_obj.topology.id)
gdf = gpd.read_postgis(
sql=query.statement,
con=query.session.bind,
geom_col="geom",
crs=f"EPSG:{srid}",
index_col="cp_id",
).to_crs(crs)
return gdf.assign(ags=0)
[docs]
def charging_processes_from_oedb(
edisgo_obj: EDisGo, engine: Engine, scenario: str, **kwargs
):
"""
Gets :attr:`~.network.electromobility.Electromobility.charging_processes_df` data
for specified scenario from oedb.
Parameters
----------
edisgo_obj : :class:`~.EDisGo`
engine : :sqlalchemy:`sqlalchemy.Engine<sqlalchemy.engine.Engine>`
Database engine.
scenario : str
Scenario for which to retrieve data. Possible options are 'eGon2035' and
'eGon100RE'.
Other Parameters
----------------
kwargs :
Possible option is `mode_parking_times`. See parameter documentation of
`import_electromobility_data_kwds` parameter in
:attr:`~.EDisGo.import_electromobility` for more information.
Returns
--------
:pandas:`pandas.DataFrame<DataFrame>`
See :attr:`~.network.electromobility.Electromobility.charging_processes_df` for
more information.
"""
saio.register_schema("demand", engine)
from saio.demand import egon_ev_mv_grid_district, egon_ev_trip
# get EV pool in grid
scenario_variation = {"eGon2035": "NEP C 2035", "eGon100RE": "Reference 2050"}
with session_scope_egon_data(engine) as session:
query = session.query(egon_ev_mv_grid_district.egon_ev_pool_ev_id).filter(
egon_ev_mv_grid_district.scenario == scenario,
egon_ev_mv_grid_district.scenario_variation == scenario_variation[scenario],
egon_ev_mv_grid_district.bus_id == edisgo_obj.topology.id,
)
pool = Counter(pd.read_sql(sql=query.statement, con=engine).egon_ev_pool_ev_id)
# get charging processes for each EV ID
with session_scope_egon_data(engine) as session:
query = session.query(
egon_ev_trip.egon_ev_pool_ev_id.label("car_id"),
egon_ev_trip.use_case,
egon_ev_trip.location.label("destination"),
egon_ev_trip.charging_capacity_nominal.label(
"nominal_charging_capacity_kW"
),
egon_ev_trip.charging_capacity_grid.label("grid_charging_capacity_kW"),
egon_ev_trip.charging_demand.label("chargingdemand_kWh"),
egon_ev_trip.park_start.label("park_start_timesteps"),
egon_ev_trip.park_end.label("park_end_timesteps"),
).filter(
egon_ev_trip.scenario == scenario,
egon_ev_trip.egon_ev_pool_ev_id.in_(pool.keys()),
)
if kwargs.get("mode_parking_times", "frugal") == "frugal":
query = query.filter(egon_ev_trip.charging_demand > 0)
ev_trips_df = pd.read_sql(sql=query.statement, con=engine)
# duplicate EVs that were chosen more than once from EV pool
df_list = []
last_id = 0
n_max = max(pool.values())
for i in range(n_max, 0, -1):
evs = sorted([ev_id for ev_id, count in pool.items() if count >= i])
df = ev_trips_df.loc[ev_trips_df.car_id.isin(evs)]
mapping = {ev: count + last_id for count, ev in enumerate(evs)}
df.car_id = df.car_id.map(mapping)
last_id = max(mapping.values()) + 1
df_list.append(df)
df = pd.concat(df_list, ignore_index=True)
# make sure count starts at 0
if df.park_start_timesteps.min() == 1:
df.loc[:, ["park_start_timesteps", "park_end_timesteps"]] -= 1
return df.assign(
ags=0,
park_time_timesteps=df.park_end_timesteps - df.park_start_timesteps + 1,
charging_park_id=np.nan,
charging_point_id=np.nan,
).astype(DTYPES["charging_processes_df"])