Skip to content

Coordinator

The Coordinator is the main class in CoFMPy. It controls the blocks internally in order to ease the usage of the library. For end users, this is the only interface to start with: from a JSON configuration file, the coordinator will instantiate and monitor all the required components.

from cofmpy import Coordinator

# Instantiate the Coordinator
my_coordinator = Coordinator()

# Start the Coordinator (and all its components) from a JSON configuration file
my_coordinator.start(conf_path="my_config_file.json")

The Coordinator can then run the simulation using do_step() or run_simulation(). Internally, the Master component will execute the steps of the co-simulation.

n_steps = 100
for _ in range(N):
    my_coordinator.do_step(step_size=0.05)

It is then possible to get the simulation results as a Pandas dataframe :

results_df = my_coordinator.get_results()
Source code in cofmpy/coordinator.py
69
70
71
72
73
74
75
76
def __init__(self):
    self.config_parser = None
    self.graph_engine = None
    self.master = None
    self.stream_handlers = None
    self.data_storages = None

    self.config_data = None

do_step

do_step(step_size: float, save_data=False)

Perform a simulation step.

PARAMETER DESCRIPTION
step_size

simulation step size

TYPE: float

save_data

whether to save the data in the default CSV data storage. Defaults to False.

TYPE: bool DEFAULT: False

Source code in cofmpy/coordinator.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def do_step(self, step_size: float, save_data=False):
    """
    Perform a simulation step.

    Args:
        step_size (float): simulation step size
        save_data (bool): whether to save the data in the default CSV data storage.
            Defaults to False.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    # Get data from inbound data stream handlers
    data = {
        key: handler.get_data(self.master.current_time)
        for key, handler in self.stream_handlers.items()
    }
    data_for_master = self._dict_tuple_to_dict_of_dict(data)

    # Do step in the master
    outputs = self.master.do_step(step_size, input_dict=data_for_master)

    # Save results and data
    if save_data:
        results = [self.master.current_time]
        for _, fmu_output_dict in outputs.items():
            for _, output_value in fmu_output_dict.items():
                results.append(output_value[0])
        for d in data.values():
            results.append(d)
        self.data_storages["results"].save("results", [results])

get_causality

get_causality(name: tuple) -> str

Gets the causality of the given variable.

PARAMETER DESCRIPTION
name

variable name as (fmu_id, var_name).

TYPE: tuple

RETURNS DESCRIPTION
str

causality of the variable.

TYPE: str

Source code in cofmpy/coordinator.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def get_causality(self, name: tuple) -> str:
    """
    Gets the causality of the given variable.

    Args:
        name (tuple): variable name as (fmu_id, var_name).

    Returns:
        str: causality of the variable.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    fmu_id, var_name = name
    return self.master.fmu_handlers[fmu_id].get_causality(var_name)

get_results

get_results() -> dict

Get the results from the simulation.

RETURNS DESCRIPTION
dict

dataframe with the results.

TYPE: dict

Source code in cofmpy/coordinator.py
131
132
133
134
135
136
137
138
139
140
def get_results(self) -> dict:
    """
    Get the results from the simulation.

    Returns:
        dict: dataframe with the results.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")
    return self.master.get_results()

get_variable

get_variable(name: tuple) -> list

Get the value of the given tuple fmu/variable.

PARAMETER DESCRIPTION
name

variable name as (fmu_id, var_name).

TYPE: tuple

RETURNS DESCRIPTION
list

value of the variable, as a list.

TYPE: list

Source code in cofmpy/coordinator.py
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def get_variable(self, name: tuple) -> list:
    """
    Get the value of the given tuple fmu/variable.

    Args:
        name (tuple): variable name as (fmu_id, var_name).

    Returns:
        list: value of the variable, as a list.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    fmu_id, var_name = name
    return self.master.fmu_handlers[fmu_id].get_variable(var_name)

get_variable_names

get_variable_names() -> list

Get the names of all variables in the system.

RETURNS DESCRIPTION
list

list of variable names as (fmu_id, var_name) tuples.

TYPE: list

Source code in cofmpy/coordinator.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def get_variable_names(self) -> list:
    """
    Get the names of all variables in the system.

    Returns:
        list: list of variable names as (fmu_id, var_name) tuples.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    var_names = []
    for fmu_id, fmu in self.master.fmu_handlers.items():
        var_names += [(fmu_id, var) for var in fmu.get_variable_names()]

    return var_names

get_variable_type

get_variable_type(name: tuple) -> str

Get the type of the given variable.

PARAMETER DESCRIPTION
name

variable name as (fmu_id, var_name).

TYPE: tuple

RETURNS DESCRIPTION
str

type of the variable.

TYPE: str

Source code in cofmpy/coordinator.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def get_variable_type(self, name: tuple) -> str:
    """
    Get the type of the given variable.

    Args:
        name (tuple): variable name as (fmu_id, var_name).

    Returns:
        str: type of the variable.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    fmu_id, var_name = name
    return self.master.fmu_handlers[fmu_id].get_variable_type(var_name)

get_variables

get_variables(names: list) -> dict

Get the values of the given variables.

PARAMETER DESCRIPTION
names

list of variable names as (fmu_id, var_name) to get, e.g. [("fmu1", "var3"), ("fmu2", "var1")].

TYPE: list

RETURNS DESCRIPTION
dict

dictionary with the variable names and their values.

TYPE: dict

Source code in cofmpy/coordinator.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def get_variables(self, names: list) -> dict:
    """
    Get the values of the given variables.

    Args:
        names (list): list of variable names as (fmu_id, var_name) to get,
            e.g. [("fmu1", "var3"), ("fmu2", "var1")].

    Returns:
        dict: dictionary with the variable names and their values.
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    var_values = {}
    for name in names:
        var_values[name] = self.get_variable(name)

    return var_values

load_data_storages

load_data_storages(data_storages_config: dict)

Load the data storages from the given dictionary of configurations.

PARAMETER DESCRIPTION
data_storages_config

dictionary containing the configurations for the data storages.

TYPE: dict

Source code in cofmpy/coordinator.py
206
207
208
209
210
211
212
213
214
215
216
217
def load_data_storages(self, data_storages_config: dict):
    """
    Load the data storages from the given dictionary of configurations.

    Args:
        data_storages_config (dict): dictionary containing the configurations for
            the data storages.
    """
    self.data_storages = {
        key: BaseDataStorage.create_data_storage(config)
        for key, config in data_storages_config.items()
    }

load_stream_handlers

load_stream_handlers(stream_handlers_config: dict)

Load the stream handlers from the given dictionary of configurations.

PARAMETER DESCRIPTION
stream_handlers_config

dictionary containing the configurations for the stream handlers.

TYPE: dict

Source code in cofmpy/coordinator.py
193
194
195
196
197
198
199
200
201
202
203
204
def load_stream_handlers(self, stream_handlers_config: dict):
    """
    Load the stream handlers from the given dictionary of configurations.

    Args:
        stream_handlers_config (dict): dictionary containing the configurations for
            the stream handlers.
    """
    self.stream_handlers = {
        key: BaseDataStreamHandler.create_handler(config)
        for key, config in stream_handlers_config.items()
    }

parse_config

parse_config(config_path: str)

Start the configuration parser to parse the given configuration file.

PARAMETER DESCRIPTION
config_path

path to the configuration file

TYPE: str

Source code in cofmpy/coordinator.py
142
143
144
145
146
147
148
149
150
151
def parse_config(self, config_path: str):
    """
    Start the configuration parser to parse the given configuration file.

    Args:
        config_path (str): path to the configuration file
    """

    self.config_parser = ConfigParser(config_path)
    self.config_data = self.config_parser.config_dict

run_simulation

run_simulation(step_size: float, end_time: float)

Run the simulation until the given end time.

PARAMETER DESCRIPTION
step_size

simulation step size

TYPE: float

end_time

simulation end time

TYPE: float

Source code in cofmpy/coordinator.py
253
254
255
256
257
258
259
260
261
262
263
264
265
def run_simulation(self, step_size: float, end_time: float):
    """
    Run the simulation until the given end time.

    Args:
        step_size (float): simulation step size
        end_time (float): simulation end time
    """
    if self.master is None:
        raise RuntimeError("Coordinator not initialized. Call start() first.")

    while self.master.current_time < end_time:
        self.do_step(step_size)

save_results

save_results(filename: str)

Save the results to a CSV file.

PARAMETER DESCRIPTION
filename

name of the CSV file to save the results to.

TYPE: str

Source code in cofmpy/coordinator.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def save_results(self, filename: str):
    """
    Save the results to a CSV file.

    Args:
        filename (str): name of the CSV file to save the results to.
    """
    df_results = pd.DataFrame.from_dict(self.get_results())

    # Sort the columns starting with "time" and then alphabetically
    columns = df_results.columns.tolist()
    columns.remove("time")
    columns = ["time"] + sorted(columns)

    # Set headers of the CSV file where tuple (fmu, var_name) is replaced by
    # "fmu.var_name"
    headers = list(columns)  # copy of the mutable list
    for i, col_header in enumerate(headers):
        if isinstance(col_header, tuple):
            headers[i] = f"{col_header[0]}.{col_header[1]}"

    df_results.to_csv(filename, columns=columns, header=headers, index=False)

start

start(
    conf_path: str,
    fixed_point_init=False,
    fixed_point_kwargs=None,
)

Start the coordinator with the given configuration file.

PARAMETER DESCRIPTION
conf_path

path to the configuration file.

TYPE: str

fixed_point_init

whether to use the fixed-point initialization method.

TYPE: bool DEFAULT: False

fixed_point_kwargs

keyword arguments for the fixed point initialization method if fixed_point is set to True. Defaults to None, in which case the default values are used "solver": "fsolve", "time_step": minimum_default_step_size, and "xtol": 1e-5.

TYPE: dict DEFAULT: None

Source code in cofmpy/coordinator.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def start(self, conf_path: str, fixed_point_init=False, fixed_point_kwargs=None):
    """
    Start the coordinator with the given configuration file.

    Args:
        conf_path (str): path to the configuration file.
        fixed_point_init (bool): whether to use the fixed-point initialization method.
        fixed_point_kwargs (dict): keyword arguments for the fixed point initialization
            method if fixed_point is set to True. Defaults to None, in which
            case the default values are used "solver": "fsolve",
            "time_step": minimum_default_step_size, and "xtol": 1e-5.
    """

    # 1. Start ConfigParser and parse the configuration file
    self.parse_config(conf_path)

    # print(self.config_parser.config_dict)

    # 2. Start GraphEngine
    self.start_graph_engine(self.config_parser.graph_config)

    # 3. Start Master
    self.config_parser.master_config["sequence_order"] = (
        self.graph_engine.sequence_order
    )
    self.start_master(
        self.config_parser.master_config,
        fixed_point_init=fixed_point_init,
        fixed_point_kwargs=fixed_point_kwargs,
    )

    # 4. Create DataStreamHandlers
    self.load_stream_handlers(self.config_parser.stream_handlers)

    # 5. Create DataStorages
    self.load_data_storages(self.config_parser.data_storages)

    # 6. Save all results in a CSV file (additional data storage)
    self.data_storages["results"] = BaseDataStorage.create_data_storage(
        {
            "type": "file",
            "config": {"output_dir": "./storage", "overwrite": True},
        }
    )
    # write the header for the results file
    output_names = []
    for fmu_id, outputs in self.master.get_outputs().items():
        for output_name in outputs:
            output_names.append(f"{fmu_id}.{output_name}")
    for stream in self.stream_handlers:
        output_names.append(f"{stream[0]}.{stream[1]}")
    self.data_storages["results"].save("results", [["t"] + output_names])

start_graph_engine

start_graph_engine(config: dict)

Start the graph engine with the given configuration.

PARAMETER DESCRIPTION
config

configuration for the graph engine containing the FMUs, connections, and edge separation.

TYPE: dict

Source code in cofmpy/coordinator.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def start_graph_engine(self, config: dict):
    """
    Start the graph engine with the given configuration.

    Args:
        config (dict): configuration for the graph engine containing the FMUs,
            connections, and edge separation.
    """
    self.graph_engine = GraphEngine(
        config["fmus"],
        config["symbolic_nodes"],
        config["connections"],
        config["edge_sep"],
    )

start_master

start_master(
    config: dict,
    fixed_point_init=False,
    fixed_point_kwargs=None,
)

Start the master algorithm with the given configuration.

PARAMETER DESCRIPTION
config

configuration for the master algorithm containing the FMUs, connections, sequence order, and loop method.

TYPE: dict

fixed_point_init

whether to use the fixed-point initialization method.

TYPE: bool DEFAULT: False

fixed_point_kwargs

keyword arguments for the fixed point initialization method if fixed_point is set to True. Defaults to None, in which case the default values are used "solver": "fsolve", "time_step": minimum_default_step_size, and "xtol": 1e-5.

TYPE: dict DEFAULT: None

Source code in cofmpy/coordinator.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def start_master(
    self, config: dict, fixed_point_init=False, fixed_point_kwargs=None
):
    """
    Start the master algorithm with the given configuration.

    Args:
        config (dict): configuration for the master algorithm containing the FMUs,
            connections, sequence order, and loop method.
        fixed_point_init (bool): whether to use the fixed-point initialization method.
        fixed_point_kwargs (dict): keyword arguments for the fixed point initialization
            method if fixed_point is set to True. Defaults to None, in which
            case the default values are used "solver": "fsolve",
            "time_step": minimum_default_step_size, and "xtol": 1e-5.
    """
    self.master = Master(
        fmu_config_list=config["fmus"],
        connections=config["connections"],
        sequence_order=config["sequence_order"],
        loop_solver=config["loop_solver"],
        fixed_point=fixed_point_init,
        fixed_point_kwargs=fixed_point_kwargs,
    )
    self.master.init_simulation(input_dict={})