Source code for energy_demand.scripts.s_generate_sigmoid

"""Script to fit technology diffusion

This script calculates the three parameters of a sigmoid diffusion
for every technology which is diffused and has a larger service
fraction at the model end year
"""
from collections import defaultdict
import numpy as np
from scipy.optimize import curve_fit
import pylab
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg') # Used to make it work in linux

from energy_demand.technologies import diffusion_technologies


[docs]def plotout_sigmoid_tech_diff( L_value, technology, xdata, ydata, fit_parameter, plot_crit=False, close_window_crit=True ): """Plot sigmoid diffusion """ def close_event(): """Timer to close window automatically """ plt.close() x = np.linspace(1990, 2110, 300) y = diffusion_technologies.sigmoid_function(x, L_value, *fit_parameter) fig = plt.figure() #creating a timer object and setting an interval timer = fig.canvas.new_timer(interval=555) timer.add_callback(close_event) fig.set_size_inches(12, 8) pylab.plot(xdata, ydata, 'o', label='base year and future market share') pylab.plot(x, y, label='fit') pylab.ylim(0, 1.05) pylab.legend(loc='best') pylab.xlabel('Time') pylab.ylabel('Market share of technology on energy service') pylab.title("Sigmoid diffusion of technology {}".format(technology)) if plot_crit: if close_window_crit: pylab.show() else: timer.start() pylab.show() pass else: pass
[docs]def calc_sigmoid_parameters( l_value, xdata, ydata, fit_assump_init=0.001, error_range=0.00002, number_of_iterations=1000 ): """Calculate sigmoid parameters. Check if fitting is good enough. Arguments ---------- l_value : float Maximum upper level xdata : array X data ydata : array Y data fit_assump_init : float Small value for correct sigmoid diffusion in case start value is equal to L fit_crit_max : float Criteria to control and abort fit fit_crit_min : float Criteria to control and abort fit (slope must be posititive) error_range : float,default=0.00002 Allowed fitting offset in percent Note ------- `error_range` can be changed if the plotting is weird. If you increase chances are however higher that the fitting does not work anymore. Returns ------ fit_parameter : array Parameters (first position: midpoint, second position: slope) """ # --------------------------------------------- # Generate possible starting parameters for fit # --------------------------------------------- start_param_list = [0.0, 1.0, 0.0001, 0.001, 0.01] # --------------------------------------------- # Fit # --------------------------------------------- cnt = 0 successfull = False while not successfull: try: start_parameters = [ round(start_param_list[cnt], 3), round(start_param_list[cnt], 3)] # ------------------------------------------------ # Test if parameter[1] should be minus or positive # ------------------------------------------------ if ydata[0] < ydata[1]: # end point has higher value crit_plus_minus = 'plus' else: crit_plus_minus = 'minus' if l_value == ydata[0]: l_value += fit_assump_init # Select start parameters depending on pos or neg diff if crit_plus_minus == 'minus': start_parameters[0] *= 1 start_parameters[1] *= -1 #logging.debug(" patameters {} {} {} {}".format( # xdata, ydata, start_parameters, l_value)) # Fit function fit_parameter = fit_sigmoid_diffusion( l_value, xdata, ydata, start_parameters, number_of_iterations=number_of_iterations) # Test if start paramters are identical to fitting parameters if (crit_plus_minus == 'plus' and fit_parameter[1] < 0) or ( crit_plus_minus == 'minus' and fit_parameter[1] > 0): cnt += 1 if cnt >= len(start_param_list): raise Exception("Error: Sigmoid curve fitting failed") else: if (fit_parameter[0] == start_parameters[0]) or ( fit_parameter[1] == start_parameters[1]): cnt += 1 if cnt >= len(start_param_list): raise Exception("Error: Sigmoid curve fitting failed") else: # Check how good the fit is y_calculated_ey = diffusion_technologies.sigmoid_function( x_value=xdata[1], l_value=l_value, midpoint=fit_parameter[0], steepness=fit_parameter[1]) y_calculated_by = diffusion_technologies.sigmoid_function( x_value=xdata[0], l_value=l_value, midpoint=fit_parameter[0], steepness=fit_parameter[1]) fit_measure_p_by = float((100.0 / ydata[0]) * y_calculated_by) fit_measure_p_ey = float((100.0 / ydata[1]) * y_calculated_ey) if (fit_measure_p_ey < (100.0 - error_range) or fit_measure_p_ey > (100.0 + error_range)) or ( fit_measure_p_by < (100.0 - error_range) or fit_measure_p_by > (100.0 + error_range)): #logging.debug("... Fitting measure %s %s (percent) is not good enough", fit_measure_p_by, fit_measure_p_ey) successfull = False cnt += 1 else: successfull = True '''plotout_sigmoid_tech_diff( l_value, "FINISHED FITTING", xdata, ydata, fit_parameter, plot_crit=True, close_window_crit=True)''' except (RuntimeError, IndexError): cnt += 1 if cnt >= len(start_param_list): raise Exception( "Fitting did not work: Check whether start year is <= the year 2000") return fit_parameter
[docs]def fit_sigmoid_diffusion( l_value, x_data, y_data, start_parameters, number_of_iterations=10000 ): """Fit sigmoid curve based on two points on the diffusion curve Arguments ---------- l_value : float The sigmoids curve maximum value (max consumption) x_data : array X coordinate of two points y_data : array X coordinate of two points number_of_iterations : int Number of iterations used for sigmoid fitting Returns ------- popt : dict Fitting parameters (l_value, midpoint, steepness) Note ---- The Sigmoid is substacted - 2000 to allow for better fit with low values Warning ------- It cannot fit a value starting from 0. Therefore, some initial penetration needs to be assumed (e.g. 0.001%) RuntimeWarning is ignored https://stackoverflow.com/questions/4359959/overflow-in-exp-in-scipy-numpy-in-python """ def sigmoid_fitting_function(x_value, x0_value, k_value): """Sigmoid function used for fitting """ with np.errstate(over='ignore'): #RuntimeWarning: overflow encountered in exp y_value = l_value / (1 + np.exp(-k_value * ((x_value - 2000.0) - x0_value))) return y_value popt, _ = curve_fit( sigmoid_fitting_function, x_data, y_data, p0=start_parameters, maxfev=number_of_iterations) return popt
[docs]def tech_l_sigmoid( s_tech_switched_ey, enduse_fuel_switches, technologies, installed_tech, s_fueltype_by_p, s_tech_by_p, fuel_tech_p_by ): """Calculate L value (maximum possible theoretical market penetration) for every installed technology with maximum theoretical replacement share (calculate second sigmoid point). Arguments ---------- s_tech_switched_ey : dict Ey service tech enduse_fuel_switches : dict Fuel switches of enduse installed_tech : dict Installed technologies (as keys) s_fueltype_by_p : dict Service share per fueltye of base year s_tech_by_p : dict Percentage of service demands for every technology fuel_tech_p_by : dict Fuel share per technology of base year Returns ------- l_values_sig : dict L value for sigmoid diffusion of all technologies for which a switch is implemented """ l_values_sig = {} # Check wheter there are technologies in this enduse which are switched if installed_tech == []: pass else: # Iterite list with enduses where fuel switches are defined for technology in installed_tech: # If decreasing technology, L-Value stays initial value if s_tech_by_p[technology] > s_tech_switched_ey[technology]: l_values_sig[technology] = s_tech_by_p[technology] else: # Calculate maximum service demand for specific tech tech_install_p = calc_service_fuel_switched( enduse_fuel_switches, technologies, s_fueltype_by_p, s_tech_by_p, fuel_tech_p_by, 'max_switch') # Read L-values with calculating maximum sigmoid theoretical diffusion l_values_sig[technology] = tech_install_p[technology] return l_values_sig
[docs]def calc_service_fuel_switched( fuel_switches, technologies, s_fueltype_by_p, s_tech_by_p, fuel_tech_p_by, switch_type ): """Calculate energy service demand percentages after fuel switches. Arguments ---------- fuel_switches : dict Fuel switches of a specific enduse technologies : dict Technologies s_fueltype_by_p : dict Service share per fueltype in base year s_tech_by_p : dict Share of service demand per technology for base year fuel_tech_p_by : dict Fuel shares for each technology of an enduse for base year switch_type : str If either switch is for maximum theoretical switch of with defined switch until end year Return ------ s_tech_switched_p : dict Service in future year with added and substracted service demand for every technology Note ---- Assertion may be removed to increase speed """ s_tech_switched_p = {} for fuel_switch in fuel_switches: tech_install = fuel_switch.technology_install tech_replace_fueltype = fuel_switch.fueltype_replace # Share of energy service of repalced fueltype before switch in base year service_p_by = s_fueltype_by_p[tech_replace_fueltype] # Service share of fueltype that will be switched if switch_type == 'max_switch': s_diff_fueltype_by_p = service_p_by * technologies[tech_install].tech_max_share elif switch_type == 'actual_switch': s_diff_fueltype_by_p = service_p_by * fuel_switch.fuel_share_switched_ey # ---------------- # Service addition # ---------------- s_tech_switched_p[tech_install] = s_tech_by_p[tech_install] + s_diff_fueltype_by_p # ---------------- # Service substraction # ---------------- # Iterate technologies which are replaced for this fueltype and substract service demand proportionally # Technologies with lower demands technologies_replaced = list(fuel_tech_p_by[tech_replace_fueltype].keys()) # Calculate proportional share of technologies in replaced fueltype in by tot_by_share = 0 for tech in technologies_replaced: tot_by_share += s_tech_by_p[tech] # Substract switched share proportionally to base year service in fueltype for tech in technologies_replaced: # Because of rounding error otherwise minus values possible round_digits = 5 # Relative share in by rel_tech_by_p = s_tech_by_p[tech] / tot_by_share # Substract (service_by - service to switch * relative share) s_tech_switched_p[tech] = round(s_tech_by_p[tech], round_digits) - round(s_diff_fueltype_by_p * rel_tech_by_p, round_digits) assert s_tech_switched_p[tech] >= 0 # ----------------------- # Calculate service fraction of all technologies in # enduse not affected by fuel switch # ----------------------- s_affected_p_ey = sum(s_tech_switched_p.values()) unaffected_service_to_distr_p = 1 - s_affected_p_ey # Calculate service fraction of remaining technologies fractions_unaffected_switch = {} for tech, s_tech_p in s_tech_by_p.items(): if tech not in s_tech_switched_p.keys(): fractions_unaffected_switch[tech] = s_tech_p # Sum of unaffected service shares service_tot_remaining = sum(fractions_unaffected_switch.values()) # Get relative distribution of all not affected techs for tech, tech_fraction in fractions_unaffected_switch.items(): # Relative share s_rel_fraction_p = tech_fraction / service_tot_remaining s_tech_switched_p[tech] = s_rel_fraction_p * unaffected_service_to_distr_p assert s_tech_switched_p[tech] >= 0 return dict(s_tech_switched_p)
[docs]def get_tech_installed(enduse, fuel_switches): """Read out all technologies which are specifically switched to of a specific enduse Parameter --------- enduse : str enduse fuel_switches : dict All fuel switches where a share of a fuel of an enduse is switched to a specific technology Return ------ installed_tech : list List with all technologies where a fuel share is switched to crit_fuel_switch : bool Criteria wheter swich is defined or not """ # Add technology list for every enduse with affected switches installed_tech = set([]) for switch in fuel_switches: if switch.enduse == enduse: installed_tech.add(switch.technology_install) if len(list(installed_tech)) > 0: crit_fuel_switch = True else: crit_fuel_switch = False return list(installed_tech), crit_fuel_switch
[docs]def get_l_values( technologies, technologies_to_consider, regions=False ): """Get l values (Maximum shares of each technology) for all installed technologies Arguments ---------- technologies : dict Technologies technologies_to_consider : list Technologies to consider s_fueltype_by_p : Fraction of service per fueltype in base year regions : dict Regions Return ------ l_values_sig : dict Sigmoid paramters """ l_values_sig = defaultdict(dict) for region in regions: if technologies_to_consider == []: pass else: for tech in technologies_to_consider: l_values_sig[region][tech] = technologies[tech].tech_max_share return dict(l_values_sig)
[docs]def tech_sigmoid_parameters( yr_until_switched, switch_yr_start, technologies, l_values, s_tech_by_p, s_tech_switched_p, fit_assump_init=0.001, plot_sigmoid_diffusion=False ): """Calculate sigmoid diffusion parameters based on energy service demand in base year and projected future energy service demand. The future energy servie demand is calculated based on fuel switches. Three potential sigmoid outputs are possible: 'linear': No sigmoid fitting possible because the service in the future year is identical to the service in the base year None: No sigmoid is fitted if the future service share is zero. fit_parameters Sigmoid diffusion parameters Because the sigmoid fitting does not work if the initial and end values are zero, small approximatie values `fit_assump_init` are inserted to allow the function 'calc_sigmoid_parameters' to fit. Arguments ---------- yr_until_switched : int Year until switch is fully realised switch_yr_start : int Start year of story in narrative technologies : dict technologies l_values : dict L values for maximum possible diffusion of technologies s_tech_by_p : dict Energy service demand for base year (1. sigmoid point) s_tech_switched_p : dict Service demand after fuelswitch fit_assump_init : float Approximation helping small number to allow fit plot_sigmoid_diffusion : bool,default=True Criteria whether sigmoid are plotted Returns ------- sig_params : dict Sigmoid diffusion parameters to read energy service demand percentage (not fuel!) Info ----- `rounding_accuracy` This rounds the by and ey service share to the defined number of digits. Because of rounding error, there might be very small differences in percentual service demand. """ def calc_m(x1, x2, y1, y2): m = (y1-y2) / (x1 - x2) return m def calc_c(m, x1, y1): c = y1 - (m * x1) return c rounding_accuracy = 4 # Criteria how much difference in % can be rounded linear_approx_crit = 0.001 # Criteria to simplify with linear approximation if difference is smaller (decimal) error_range = 0.0002 # Error how close the fit must be number_of_iterations = 100 # Number of iterations of sigmoid fitting algorithm # Technologies to apply calculation installed_techs = s_tech_by_p.keys() sig_params = defaultdict(dict) # Fitting criteria where the calculated sigmoid slope and midpoint can be provided limits if installed_techs == []: pass else: for tech in installed_techs: # Test whether technology has the market entry before or after base year, # If afterwards, set very small number in market entry year if technologies[tech].market_entry > switch_yr_start: point_x_by = technologies[tech].market_entry point_y_by = fit_assump_init else: point_x_by = switch_yr_start # Base year point_y_by = s_tech_by_p[tech] # Base year service share # If the base year is the market entry year use a very small number if point_y_by == 0: point_y_by = fit_assump_init #elif point_y_by == 1.0: # point_y_by = 1 - fit_assump_init # Future energy service demand point_x_ey = yr_until_switched point_y_ey = s_tech_switched_p[tech] # If future share is zero, entry small value if point_y_ey == 0: point_y_ey = fit_assump_init elif point_y_ey == 1.0: point_y_ey = 1 - fit_assump_init else: pass # Data of the two points xdata = np.array([point_x_by, point_x_ey]) ydata = np.array([point_y_by, point_y_ey]) # If no change in by to ey but not zero (linear change) if (round(point_y_by, rounding_accuracy) == round(point_y_ey, rounding_accuracy)) and ( point_y_ey != fit_assump_init) and ( point_y_by != fit_assump_init): #if 1 == 1: # Linear diffusion (because by and ey share are identical) sig_params[tech]['midpoint'] = 'linear' sig_params[tech]['steepness'] = 'linear' sig_params[tech]['l_parameter'] = 'linear' # Calculate linear slope and linear y-intercept (with two data points) sig_params[tech]['linear_slope'] = calc_m(xdata[0], xdata[1], ydata[0], ydata[1]) sig_params[tech]['linear_y_intercept'] = calc_c(sig_params[tech]['linear_slope'], xdata[0], ydata[0]) else: # Test if no increase or decrease or if no future potential share if (point_y_by == fit_assump_init and point_y_ey == fit_assump_init) or (l_values[tech] == 0): sig_params[tech]['midpoint'] = None sig_params[tech]['steepness'] = None sig_params[tech]['l_parameter'] = None else: # If difference is smaller than a certain share, approximate with linear if abs(ydata[1] - ydata[0]) < linear_approx_crit: sig_params[tech]['midpoint'] = 'linear' sig_params[tech]['steepness'] = 'linear' sig_params[tech]['l_parameter'] = 'linear' # Calculate linear slope and linear y-intercept (with two data points) sig_params[tech]['linear_slope'] = calc_m(xdata[0], xdata[1], ydata[0], ydata[1]) sig_params[tech]['linear_y_intercept'] = calc_c(sig_params[tech]['linear_slope'], xdata[0], ydata[0]) try: # Parameter fitting fit_parameter = calc_sigmoid_parameters( l_values[tech], xdata, ydata, fit_assump_init=fit_assump_init, error_range=error_range, number_of_iterations=number_of_iterations) sig_params[tech]['midpoint'] = fit_parameter[0] sig_params[tech]['steepness'] = fit_parameter[1] sig_params[tech]['l_parameter'] = l_values[tech] except: """If sigmoid fitting failed, implement linear diffusion The sigmoid diffusion may fail if the fitting does not work because the points to fit are too similar. This could be improved by increasing the number of iterations 'number_of_iterations' at higher computation costs """ #logging.warning("Instead of sigmoid a linear approximation is used %s %s %s", tech, xdata, ydata) sig_params[tech]['midpoint'] = 'linear' sig_params[tech]['steepness'] = 'linear' sig_params[tech]['l_parameter'] = 'linear' # Calculate linear slope and linear y-intercept (with two data points) sig_params[tech]['linear_slope'] = calc_m(xdata[0], xdata[1], ydata[0], ydata[1]) sig_params[tech]['linear_y_intercept'] = calc_c(sig_params[tech]['linear_slope'], xdata[0], ydata[0]) return dict(sig_params)