Skip to content

API Reference

Backend

ml4cps.automata

Automaton

Bases: CPSComponent

Automaton class is the main class for modeling various kinds of hybrid systems.

Source code in ml4cps/automata/base.py
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
class Automaton (CPSComponent):
    """
    Automaton class is the main class for modeling various kinds of hybrid systems.
    """

    def __init__(self, states: list = None, transitions: list = None, configuration=None,
                 unknown_state: str = 'raise', id="", initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(),
                 **kwargs):
        """
        Class initialization from lists of elements.
        :param states: Discrete states / modes of continuous behavior.
        :param events: The events that trigger state transitions.
        :param transitions: The transition information. If a collection of dicts then dict should contain "source",
        "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection
        of tuples can be used of the form (source, event, dest, *).
        :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.
        """
        self._G = nx.MultiDiGraph()
        if initial_q and isinstance(initial_q, str):
            initial_q = [initial_q]
        self.q0 = OrderedDict.fromkeys(initial_q)
        self.initial_r = initial_r
        if final_q and isinstance(final_q, str):
            final_q = [final_q]
        self.final_q = OrderedDict.fromkeys(final_q)
        self.previous_node_positions = None
        self.UNKNOWN_STATE = unknown_state
        self.configuration = configuration
        if super_states is not None:
            if type(super_states) is str:
                self.__super_states = [super_states]
            else:
                self.__super_states = list(super_states)

            # self._G.add_nodes_from(self.__super_states)

        if decision_states is not None:
            if type(decision_states) is str:
                self.decision_states = [decision_states]
            else:
                self.decision_states = list(decision_states)

        if states is not None:
            for state in states:
                if isinstance(state, dict):
                    state_data = dict(state)
                    state_name = state_data.pop("name", state_data.pop("state", None))
                    if state_name is None:
                        raise ValueError('State dict must contain "name" or "state".')
                    if "configs" in state_data:
                        raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
                    self.add_state(state_name, **state_data)
                else:
                    self.add_state(state)

        if transitions is not None:
            for tr in transitions:
                if type(tr) is dict:
                    tr_data = dict(tr)
                    source = tr_data.pop("source")
                    destination = tr_data.pop("destination", tr_data.pop("dest", None))
                    if destination is None:
                        raise ValueError('Transition dict must contain "destination" or "dest".')
                    event = tr_data.pop("event")
                    guard = tr_data.pop("guard", None)
                    if "configs" in tr_data:
                        raise ValueError('Transition dict field "configs" is not supported. Use "enabled" instead.')
                    self.add_transition(source, destination, event, guard=guard, **tr_data)
                else:
                    if len(tr) < 3:
                        raise ValueError("Transition tuple must contain at least (source, event, destination).")
                    source, event, destination = tr[0], tr[1], tr[2]
                    transition_data = tr[3] if len(tr) > 3 and isinstance(tr[3], dict) else {}
                    self.add_transition(source, destination, event, **transition_data)

        if 'discr_state_names' not in kwargs:
            kwargs['discr_state_names'] = ['Mode']
        elif type(kwargs['discr_state_names']) is str:
            kwargs['discr_state_names'] = [kwargs['discr_state_names']]
        CPSComponent.__init__(self, id, unknown_state=unknown_state, **kwargs)

    def _resolve_enabled_for_add(self, enabled=None):
        if enabled is not None:
            return enabled
        if self.configuration is not None:
            return {self.configuration: 1}
        return None

    @staticmethod
    def _normalize_state_ref(state):
        if isinstance(state, tuple) and len(state) == 1:
            return state[0]
        return state

    def set_configuration(self, configuration):
        self.configuration = configuration

    def _resolve_config_value(self, value, source=None, destination=None, key=None, data=None, default=None):
        if callable(value):
            try:
                signature = inspect.signature(value)
                params = list(signature.parameters.values())
                has_varargs = any(p.kind == inspect.Parameter.VAR_POSITIONAL for p in params)
                positional_params = [
                    p for p in params
                    if p.kind in (inspect.Parameter.POSITIONAL_ONLY, inspect.Parameter.POSITIONAL_OR_KEYWORD)
                ]
                call_args = (self.configuration, source, destination, key, data)
                if has_varargs:
                    return value(*call_args)
                return value(*call_args[:len(positional_params)])
            except (TypeError, ValueError):
                try:
                    return value(self.configuration)
                except TypeError:
                    return value()

        if isinstance(value, dict):
            if self.configuration in value:
                return value[self.configuration]
            for fallback_key in ("*", "default", None):
                if fallback_key in value:
                    return value[fallback_key]
            return default

        if value is None:
            return default
        return value

    def _resolve_transition_weight(self, source, destination, key, data, default=1.0):
        for field in ("frequency", "freq", "p", "probability", "prob"):
            if field in data:
                resolved = self._resolve_config_value(
                    data.get(field),
                    source=source,
                    destination=destination,
                    key=key,
                    data=data,
                    default=default,
                )
                if resolved is None:
                    return default
                try:
                    return float(resolved)
                except (TypeError, ValueError):
                    return default
        return default

    def _resolve_transition_time(self, source, destination, key, data):
        time_value = self._resolve_config_value(
            data.get("time"),
            source=source,
            destination=destination,
            key=key,
            data=data,
            default=None,
        )
        if time_value is not None:
            return time_value

        interval_value = self._resolve_config_value(
            data.get("interval"),
            source=source,
            destination=destination,
            key=key,
            data=data,
            default=None,
        )
        if interval_value is not None:
            if isinstance(interval_value, (tuple, list, np.ndarray)) and len(interval_value) >= 2:
                min_interval = float(interval_value[0])
                max_interval = float(interval_value[1])
                if max_interval < min_interval:
                    min_interval, max_interval = max_interval, min_interval
                return random.uniform(min_interval, max_interval)
            return interval_value

        min_interval = None
        max_interval = None
        for field in ("min_interval", "minTiming", "min_time"):
            if field in data:
                min_interval = self._resolve_config_value(
                    data.get(field),
                    source=source,
                    destination=destination,
                    key=key,
                    data=data,
                    default=None,
                )
                break
        for field in ("max_interval", "maxTiming", "max_time"):
            if field in data:
                max_interval = self._resolve_config_value(
                    data.get(field),
                    source=source,
                    destination=destination,
                    key=key,
                    data=data,
                    default=None,
                )
                break

        if min_interval is not None and max_interval is not None:
            min_interval = float(min_interval)
            max_interval = float(max_interval)
            if max_interval < min_interval:
                min_interval, max_interval = max_interval, min_interval
            return random.uniform(min_interval, max_interval)
        if min_interval is not None:
            return float(min_interval)
        if max_interval is not None:
            return float(max_interval)
        return None

    @staticmethod
    def _normalize_timing_values(timing):
        if timing is None:
            return []
        if isinstance(timing, (list, tuple, np.ndarray)):
            return list(timing)
        if isinstance(timing, (str, bytes)):
            return [timing]
        try:
            return list(timing)
        except TypeError:
            return [timing]

    def _get_timing_values_for_transition_data(self, data):
        resolved = self._resolve_config_value(
            data.get("timing", []),
            source=None,
            destination=None,
            key=None,
            data=data,
            default=[],
        )
        return self._normalize_timing_values(resolved)

    def _timing_count_for_transition_data(self, data):
        count_timings = 0
        if 'timing' in data:
            timings = data['timing']
            if isinstance(timings, dict):
                count_timings = {k: len(v) for k, v in timings.items()}
            else:
                count_timings = len(timings)
        return count_timings

    def _state_allowed(self, state):
        if not self._G.has_node(state):
            return False
        state_data = self._G.nodes[state]

        enabled_data = state_data.get("enabled", None)
        if enabled_data is None:
            return True
        enabled = self._resolve_config_value(
            enabled_data,
            source=state,
            destination=state,
            key=None,
            data=state_data,
            default=0,
        )
        return bool(enabled)

    def _transition_allowed(self, source, destination, key, data):
        if not (self._state_allowed(source) and self._state_allowed(destination)):
            return False

        enabled_data = data.get("enabled", None)
        if enabled_data is not None:
            enabled = self._resolve_config_value(
                enabled_data,
                source=source,
                destination=destination,
                key=key,
                data=data,
                default=0,
            )
            if not bool(enabled):
                return False

        guard = data.get("guard")
        if guard is None:
            return True
        return bool(self._resolve_config_value(
            guard,
            source=source,
            destination=destination,
            key=key,
            data=data,
            default=True,
        ))

    @property
    def Sigma(self):
        Sigma = []
        for _, _, key, data in self.get_transitions(active_only=True):
            new_el = data.get("event", key)
            if new_el not in Sigma:
                Sigma.append(new_el)
        return Sigma

    @property
    def num_events(self):
        return len(self.Sigma)

    @property
    def discrete_states(self):
        return self._G.nodes

    @property
    def active_states(self):
        return [s for s in self.discrete_states if self._state_allowed(s)]

    @property
    def transitions(self):
        return self._G.edges(data=True)

    @property
    def discrete_state(self):
        if self._q is not None:
            if len(self._q) == 0:
                return ()
            return self._q[0]
        return None

    @discrete_state.setter
    def discrete_state(self, value):
        self._q = (value,)

    @property
    def state(self):
        """
        Automata discrete state is uni-variate.
        :return:
        """
        if len(self._q) < 1:
            raise Exception(f'State of "{self.id}" is empty.')
        return self._q[0], self._xt, self._xk

    @state.setter
    def state(self, state):
        if type(state) is not tuple:
            new_states = (state, (), ())
        elif len(state) < 3:
            new_states = [(), (), ()]
            for i, v in enumerate(state):
                new_states[i] = v
        else:
            new_states = state

        if new_states[0] not in self.discrete_states and new_states[0] not in self.__super_states and new_states[0] != self.UNKNOWN_STATE:
            raise ValueError(f'State "{new_states[0]}" is not a valid state. Valid states are: {self.discrete_states}')
        if new_states[0] in self.discrete_states and not self._state_allowed(new_states[0]):
            raise ValueError(f'State "{new_states[0]}" is not allowed for configuration "{self.configuration}".')

        self._q = (new_states[0],)
        self._xt = new_states[1]
        self._xk = new_states[2]

    @property
    def num_modes(self):
        """
        Returns the number of modes in the automaton.
        :return: number of states.
        """
        return self._G.number_of_nodes()

    @property
    def num_transitions(self):
        """
        Returns the number of transitions in the automaton.
        :return: number of transitions.
        """
        return self._G.number_of_edges()

    def merge(self, q1, q2):
        """
        If two states are compatible, they are merged with the function merge. The transitions
        of the automaton, the in- and outdegree of the states and the number of transitions
        happening are adjusted.
        """
        intr = self.in_transitions(q2)
        # outtr_q1 = list(self.out_transitions(q1))
        outtr_q2 = list(self.out_transitions(q2))
        # nx.contracted_nodes(self._G, w, v, copy=False)
        # set event

        for tr in intr:
            if tr[0] != q2:
                self.add_single_transition(tr[0], q1, tr[2], timing=tr[-1]['timing'])
        # if q1=='q0' and q2 == 'q192':
        #     print('found')
        for tr in outtr_q2:
            dest = tr[1]
            if dest == q2:
                dest = q1
            self.add_single_transition(q1, dest, tr[2], timing=tr[-1]['timing'])
        self.remove_state(q2)

    def determinize(self, s, state_index, verbose=False):
        # out_tr = list(sorted(self.out_transitions(s), key=lambda x: state_index[x[1]]))
        if verbose:
            print('Determinize', s)
        # ind = sorted(, key=lambda x: state_index[x])
        # state_index_list = [None] * len(state_index)
        # for k, v in state_index.items():
        #     state_index_list[v] = k
        # if verbose:
        #     pprint.pprint(state_index_list[state_index[s]:], compact=True)

        to_analyze = [s]
        while to_analyze:  # state_ind < len(state_index_list):
            # print(state_ind)
            s = to_analyze.pop()
            # if not self.is_state(s):
            #     continue

            conflicting_transitions = {}
            to_merge = None
            for out_tr in self.out_transitions(s):
                if out_tr[2] in conflicting_transitions:
                    to_merge = (conflicting_transitions[out_tr[2]], out_tr[1])
                    break
                else:
                    conflicting_transitions[out_tr[2]] = out_tr[1]
            # out_tr = list(self.out_transitions(st))
            if to_merge is not None:
                if state_index[to_merge[0]] < state_index[to_merge[1]]:
                    merge_into = to_merge[0]
                    to_be_merged = to_merge[1]
                else:
                    merge_into = to_merge[1]
                    to_be_merged = to_merge[0]

                to_analyze.append(s)
                to_analyze.append(merge_into)
                if verbose:
                    print('Merge to determinize: ', to_be_merged, 'into', merge_into)
                self.merge(merge_into, to_be_merged)

        # to_determinize = []
        # for i, tr in enumerate(out_tr):
        #     if self.is_state(tr[1]):
        #         for tr2 in out_tr[:i:-1]:
        #             if self.is_state(tr2[1]) and tr[2] == tr2[2]:
        #                 # print(tr[1], ',', tr2[1])
        #                 if state_index[tr2[1]] > state_index[tr[1]]:
        #                     print('Merge to determinize: ', tr2[1], 'into', tr[1])
        #                     self.merge(tr[1], tr2[1])
        #                     to_determinize.append(tr[1])
        #                     # self.determinize(tr[1], state_index)
        #                 else:
        #                     print('Merge to determinize: ', tr[1], 'into', tr2[1])
        #                     self.merge(tr2[1], tr[1])
        #                     to_determinize.append(tr2[1])
        #                     # self.determinize(tr2[1], state_index)
        # for s in to_determinize:
        #     self.determinize(s, state_index)

    def accepts(self, string, return_states=False):
        state_path = []
        current_state = next(iter(self.q0))
        state_path.append(current_state)
        for symbol in string:
            trans = self.out_transitions(current_state, symbol)
            if trans is None or not trans:
                if return_states:
                    return False, state_path
                else:
                    return False
            elif len(trans) > 1:
                raise Exception(f'Too many transitions from {current_state} given symbol {symbol}.')
            else:
                current_state = trans[0][1]
                state_path.append(current_state)
        accepted = current_state in self.final_q or self.final_q is None or len(self.final_q) == 0
        if return_states:
            return accepted, state_path
        else:
            return accepted

    def generate(self, number_of_sequences=1, return_states=False, max_steps=100, prob_to_accept=0.5):
        if number_of_sequences > 1:
            res = [self.generate(return_states=return_states, prob_to_accept=prob_to_accept)
                   for _ in range(number_of_sequences)]
            return res
        state_path = []
        current_state = next(iter(self.q0))
        event_path = []
        state_path.append(current_state)
        while True:
            if current_state in self.final_q:
                if random.random() < prob_to_accept:
                    break
            trans = self.out_transitions(current_state)
            if trans is None or not trans:
                if current_state not in self.final_q:
                    raise Exception(f'State "{current_state}" is not an accepting state, but no possible transitions.')
                break
            else:
                weighted_transitions = []
                for source, destination, key, data in trans:
                    weight = self._resolve_transition_weight(source, destination, key, data, default=1.0)
                    weighted_transitions.append((source, destination, key, data, max(weight, 0.0)))
                if any(x[4] > 0 for x in weighted_transitions):
                    new_trans = random.choices(
                        weighted_transitions,
                        weights=[x[4] for x in weighted_transitions]
                    )[0]
                else:
                    new_trans = random.choice(weighted_transitions)
                current_state = new_trans[1]
                current_event = new_trans[3].get('event', new_trans[2])
                state_path.append(current_state)
                event_path.append(current_event)
                if max_steps is not None and len(event_path) >= max_steps:
                    break
        if return_states:
            return event_path, state_path
        else:
            return event_path


    def try_merge_states(self, state1, state2, try_fun=None):
        """Merge state2 into state1 and update transitions."""
        old_G = self._G
        make_final = state2 in self.final_q and state1 not in self.final_q
        make_initial = state2 in self.q0 and state1 not in self.q0
        self._G = nx.contracted_nodes(self._G, state1, state2)
        # make it deterministic again
        events = [x[3]['event'] for x in self.out_transitions(state1)]
        deterministic = len(set(events)) == len(events)

        if deterministic:
            if make_initial:
                self.add_initial_state(state1)
            if make_final:
                self.add_final_state(state1)

            if try_fun is not None and not try_fun(self):
                self._G = old_G
                if make_final:
                    self.final_q.pop(state1)
                if make_initial:
                    self.q0.pop(state1)
        else:
            self._G = old_G


    def remove_transition(self, source, dest):
        """
        Remove the transition(s) from source to dest.
        :param source:
        :param dest:
        :return:
        """
        if self.configuration is not None:
            tr = self.get_transition(source, dest)
            for v in tr[3].values():
                if isinstance(v, dict):
                    v.pop(self.configuration, None)
        else:
            self._G.remove_edge(source, dest)

    def is_decision(self, state, overall_state):
        return state in self.decision_states

    def get_alternatives(self, state, system_state):
        if self.is_decision(state, system_state):
            trans = [(x[3]['event'], dict()) for x in self.out_transitions(state)]
            return [(None, None)] + trans
        else:
            return None

    def remove_rare_transitions(self, min_p=0, min_num=0, keep_from_initial=False, keep_states=False, keep=None):
        self.learn_transition_probabilities()
        for source, dest, event, data in self.get_transitions():
            if keep_from_initial and source in self.q0:
                continue
            frequency = self._timing_count_for_transition_data(data)
            if isinstance(frequency, dict):
                for k, f in frequency.items():
                    if f < min_num:
                        self.set_configuration(k)
                        self.remove_transition(source, dest)
                self.set_configuration(None)
                frequency = self._timing_count_for_transition_data(data)
                if isinstance(frequency, dict):
                    freq_condition = len(frequency) == 0
                else:
                    freq_condition = frequency <= min_num
            else:
                freq_condition = frequency <= min_num
            if (freq_condition or data['probability'] < min_p) and \
                    ((keep is None) or (keep is not None and source not in keep and dest not in keep)):
                self.remove_transition(source, dest)

        if not keep_states:
            for s in list(self.discrete_states):
                # if s in self.q0:
                #     continue
                if len(self.in_transitions(s)) == 0 and len(self.out_transitions(s)) == 0:
                    self.remove_state(s)

            # if self.DummyInitial:
            #     for s in list(self.InitialState):
            #         if len(self.out_transitions(s)) == 0:
            #             self.States.pop(s)
            #             self.InitialState.pop(s)

        # recalculate probabilities
        if min_p:
            self.learn_transition_probabilities()

    def learn_transition_probabilities(self):
        for s in self.discrete_states:
            outgoing = list(self.out_transitions(s))
            total_num = 0
            for _, _, _, data in outgoing:
                timings = self._timing_count_for_transition_data(data)
                if isinstance(timings, dict):
                    total_num += sum(timings.values())
                else:
                    total_num += timings
            for _, _, _, data in outgoing:
                timings = self._timing_count_for_transition_data(data)
                if isinstance(timings, dict):
                    num = sum(timings.values())
                else:
                    num = timings
                data['probability'] = (num / total_num) if total_num else 0.0

    def state_is_deterministic(self, q):
        events = set()
        for tr in self.out_transitions(q):
            if tr[2] in events:
                return False
            else:
                events.add(tr[2])
        return True

    def update_timing_boundaries(self, source, destination, event, newTiming):
        edge_data = self._G.get_edge_data(source, destination, event)
        try:
            if newTiming < edge_data['minTiming']:
                edge_data['minTiming'] = newTiming
            elif newTiming > edge_data['maxTiming']:
                edge_data['maxTiming'] = newTiming
        except KeyError:
            edge_data['minTiming'] = newTiming
            edge_data['maxTiming'] = newTiming

    def is_deterministic(self):
        for q in self.discrete_states:
            if not self.state_is_deterministic(q):
                print('State', q, 'not deterministic:')
                for tr in sorted(self.out_transitions(q), key=lambda x: x[2]):
                    print(tr[0], '->', tr[2], '->', tr[1])
                return False
        return True

    def add_single_transition(self, s, d, e, timing=None):
        timing_values = None if timing is None else self._normalize_timing_values(timing)
        edge_data = self._G.get_edge_data(s, d, e)
        if edge_data is None:
            if timing is None:
                self._G.add_edge(s, d, key=e, event=e)
            else:
                if self.configuration is None:
                    self._G.add_edge(s, d, key=e, event=e, timing=timing_values)
                else:
                    self._G.add_edge(s, d, key=e, event=e, timing={self.configuration: timing_values})
            return

        if timing is None:
            return

        existing_timing = edge_data.get("timing", None)
        if self.configuration is None:
            if isinstance(existing_timing, dict):
                existing_timing.setdefault(None, []).extend(timing_values)
            elif existing_timing is None:
                edge_data["timing"] = timing_values
            else:
                edge_data["timing"] = self._normalize_timing_values(existing_timing) + timing_values
        else:
            if isinstance(existing_timing, dict):
                existing_timing.setdefault(self.configuration, []).extend(timing_values)
            elif existing_timing is None:
                edge_data["timing"] = {self.configuration: timing_values}
            else:
                edge_data["timing"] = {
                    None: self._normalize_timing_values(existing_timing),
                    self.configuration: timing_values,
                }

    def add_state_data(self, s: str, d: object):
        """
    Add state data to a state s the automaton.
        :param s: state
        :param d: data to be added to s
        :return:
        """
        self.Q[s] = d

    def add_state(self, new_state, enabled=None, **kwargs):
        """
    Add state to the automaton.
        :param new_state: State to be added.
        """
        resolved_enabled = self._resolve_enabled_for_add(enabled=enabled)
        if resolved_enabled is not None:
            kwargs["enabled"] = resolved_enabled
        self._G.add_node(new_state, **kwargs)

    def add_states_from(self, new_state, enabled=None, **kwargs):
        """
    Add multiple states to the automaton.
        :param new_state: States to be added.
        """
        if isinstance(new_state, str):
            new_state = [new_state]
        for state in new_state:
            if isinstance(state, dict):
                state_data = dict(state)
                state_name = state_data.pop("name", state_data.pop("state", None))
                if state_name is None:
                    raise ValueError('State dict must contain "name" or "state".')
                if "configs" in state_data:
                    raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
                state_enabled = state_data.pop("enabled", enabled)
                merged_state_data = dict(kwargs)
                merged_state_data.update(state_data)
                self.add_state(state_name, enabled=state_enabled, **merged_state_data)
            elif isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], dict):
                state_data = dict(state[1])
                if "configs" in state_data:
                    raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
                state_enabled = state_data.pop("enabled", enabled)
                merged_state_data = dict(kwargs)
                merged_state_data.update(state_data)
                self.add_state(state[0], enabled=state_enabled, **merged_state_data)
            else:
                self.add_state(state, enabled=enabled, **kwargs)

    def add_transitions_from(self, list_of_tuples, **other):
        """
    Add multiple transition.
        :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
        """
        for tr in list_of_tuples:
            if isinstance(tr, dict):
                tr_data = dict(other)
                tr_data.update(tr)
                source = tr_data.pop("source")
                destination = tr_data.pop("destination", tr_data.pop("dest", None))
                if destination is None:
                    raise ValueError('Transition dict must contain "destination" or "dest".')
                event = tr_data.pop("event")
                if "configs" in tr_data:
                    raise ValueError('Transition dict field "configs" is not supported. Use "enabled" instead.')
                enabled = tr_data.pop("enabled", None)
                guard = tr_data.pop("guard", None)
                self.add_transition(source, destination, event, enabled=enabled, guard=guard, **tr_data)
            else:
                if len(tr) < 3:
                    raise ValueError("Transition tuple must contain at least (source, destination, event).")
                source, destination, event = tr[0], tr[1], tr[2]
                tr_data = dict(other)
                if len(tr) > 3 and isinstance(tr[3], dict):
                    tr_data.update(tr[3])
                self.add_transition(source, destination, event, **tr_data)

    def add_transition(self, s, d, e, enabled=None, guard=None, **other):
        """
    Add multiple transition.
        :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
        """
        resolved_enabled = self._resolve_enabled_for_add(enabled=enabled)
        if resolved_enabled is not None:
            other["enabled"] = resolved_enabled
        self._G.add_edge(s, d, e, **other, event=e, guard=guard)

    def add_initial_state(self, states):
        """
    Add initial state(s) of the automaton.
        :param states: States to add.
        """
        if type(states) is str:
            states = (states,)
        for s in states:
            if s is not None and s not in self.q0:
                self.q0[s] = None
            self._G.add_node(s)

    def add_final_state(self, states):
        """
    Add final state(s) of the automaton.
        :param states: States to add.
        """
        if type(states) is str:
            states = (states,)
        for s in states:
            if s is not None and s not in self.final_q:
                self.final_q[s] = None
            self._G.add_node(s)

    def is_transition(self, s, d, e):
        """
    Check if a transition (s,d,e) exists in the automaton.
        :param s: Source.
        :param d: Destination.
        :param e: Event.
        :return:
        """
        transitions = [trans for trans in self.T.values() if trans['source'] == s and
                       trans['destination'] == d and trans['event'] == e]

        is_transition = len(transitions) != 0
        return is_transition

    # def num_occur(self, q, e):
    #     tr = self.get_transition(q, e=e)
    #     if tr[-1]:
    #         return len(tr[-1]['timing'])
    #     else:
    #         return ""

    def num_occur(self, tr):
        return self._timing_count_for_transition_data(tr[-1])

    def num_timings(self):
        return sum(self._timing_count_for_transition_data(tr[-1]) for tr in self.get_transitions())

    def get_num_in(self, q):
        """
        Returns the number of in transitions of state q in the automaton.
        :return: number of transitions.
        """
        if self._G.has_node(q):
            return self._G.in_degree(q)
        else:
            raise Exception(f'State {q} not in the automaton.')

    def get_num_out(self, q):
        """
        Returns the number of out transitions of state q in the automaton.
        :return: number of transitions.
        """
        if self._G.has_node(q):
            return self._G.out_degree(q)
        else:
            raise Exception(f'State {q} not in the automaton.')

    def is_state(self, q, active_only=False):
        q = self._normalize_state_ref(q)
        return self._G.has_node(q) and (not active_only or self._state_allowed(q))

    def remove_state(self, s):
        self._G.remove_node(s)
        if s in self.q0:
            self.q0.pop(s)

    def in_transitions(self, s, event=None):
        """
    Get all incoming transitions of state s.
        :param s:
        :return:
        """
        s = self._normalize_state_ref(s)
        transitions = list(self._G.in_edges(s, data=True, keys=True))
        if event is not None:
            transitions = [e for e in transitions if e[3].get("event", e[2]) == event]
        return [e for e in transitions if self._transition_allowed(*e)]

    def out_transitions(self, s, event=None):
        """
    Get all outgoing transitions of state s.
        :param event:
        :param s:
        :return:
        """
        s = self._normalize_state_ref(s)
        transitions = list(self._G.out_edges(s, data=True, keys=True))
        if event is not None:
            transitions = [e for e in transitions if e[3].get("event", e[2]) == event]
        return [e for e in transitions if self._transition_allowed(*e)]

    def discrete_event_dynamics(self, q, xt, xk, p) -> tuple:
        e = p["event"]
        new_q = self.UNKNOWN_STATE
        transitions = self.out_transitions(q, event=e)
        if len(transitions) == 1:
            new_q = transitions[0][1]
        else:
            stoch_dest = []
            for source, d, key, data in transitions:
                p_weight = self._resolve_transition_weight(source, d, key, data, default=0.0)
                stoch_dest.append((d, max(0.0, p_weight)))
            if stoch_dest:
                if any(x[1] > 0 for x in stoch_dest):
                    new_q = random.choices([x[0] for x in stoch_dest], weights=[x[1] for x in stoch_dest])[0]
                else:
                    new_q = random.choice(stoch_dest)[0]
            else:  # try to recover
                dests = set(
                    d for s, d, k, data in self.get_transitions(active_only=True)
                    if data.get("event", k) == e
                )
                if len(dests) == 1:
                    new_q = dests.pop()
        return (new_q,), None, None

    def guards(self, q, x):
        pass

    def timed_event(self, q, xc, xd):
        q = self._normalize_state_ref(q)
        possible_destinations = list(self.out_transitions(q))
        if possible_destinations:
            if len(possible_destinations) == 1:
                dest = possible_destinations[0]
            else:
                probs = [
                    max(0.0, self._resolve_transition_weight(s, d, key, data, default=1.0))
                    for s, d, key, data in possible_destinations
                ]
                total = sum(probs)
                if total > 0:
                    probs = [p / total for p in probs]
                    dest = np.random.choice(possible_destinations, p=probs)
                else:
                    dest = np.random.choice(possible_destinations)
            source, destination, key, data = dest
            time = self._resolve_transition_time(source, destination, key, data)
            if time is not None:
                return time, destination
        return None, None

    def get_transition(self, s, d=None, e=None, if_more_than_one='raise'):
        """
        Get all transitions with source state s, destination state __d. In case when e is provided, the returned list
        contains transitions where event is e.
        :param if_more_than_one:
        :param s: Source state.
        :param d: Destination state.
        :param e: Event.
        :return:
        """
        transitions = list(self.out_transitions(s))
        if e is None and d is not None:
            transitions = [trans for trans in transitions if trans[1] == d]
        elif d is None and e is not None:
            transitions = [trans for trans in transitions if trans[3].get("event", trans[2]) == e]
        elif d is not None and e is not None:
            transitions = [trans for trans in transitions if trans[1] == d and trans[3].get("event", trans[2]) == e]
        else:
            transitions = transitions

        if len(transitions) > 1:
            if if_more_than_one == 'raise':
                raise Exception('There are multiple transitions which satisfy the condition.')
            else:
                return transitions
        elif len(transitions) == 0:
            return None
        else:
            return transitions[0]

    def rename_events(self, prefix="e_"):
        """
    Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.
        """
        i = 0
        new_events_dict = OrderedDict()
        for k, v in self.Sigma.items():
            new_key = f'{prefix}{i}'
            new_value = v
            if new_value is None:
                new_value = {}
            new_value['old_symbol'] = k
            i += 1
            new_events_dict[new_key] = new_value
            for t in self.T.values():
                if t['event'] == k:
                    t['event'] = new_key
        # self.Sigma = new_events_dict

    def step(self, q, x0, t, u):
        """
    Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
        :param x0: Initial state vector.
        :param t: Time at start of the step simulation.
        :param u: Arguments passed.....
        :return: Time t+dt, value of state at t+dt
        """
        s = solve_ivp(self.flow, t_span=(t, t + self.dt), y0=x0, method='RK23', args=u)
        xc = s.y[:, -1]
        t = s.t[-1]

        xk = self.time_discrete_dynamics(t, q, x0)
        return t, np.concatenate(xc, xk)

    def __str__(self):
        """
    String representation of the automaton.
        :return:
        """
        return f"""Automaton:
    Number of states: {self.num_modes}
    Number of transitions: {self.num_transitions}
    Configuration: {self.configuration}
    Initial state(s): {list(self.q0.keys())}
    Final state(s): {list(self.final_q.keys())}"""

    def flow(self, q, p, x, u):
        """
    Flow equation gives derivative of the continuous variables.
        :param q: Current discrete state of the model.
        :param p: Stochastic parameters generated on entry to state current_q.
        :param x: Current continuous state of the model.
        :param u: Calculated internal i signals.
        :return: Derivative of the continuous state x.
        """
        pass

    def inv(self, t, q, x, y, z, p):
        """
    Invariants.
        :param t:
        :param q:
        :param x:
        :param y:
        :param z:
        :param p:
        """
        pass

    def get_transitions(self, active_only=False):
        transitions = list(self._G.edges(data=True, keys=True))
        if active_only:
            transitions = [tr for tr in transitions if self._transition_allowed(*tr)]
        return transitions

    def print_state(self, v):
        """Prints outgoing transitions of a state v.

        Args:
            v (state): 

        Returns:
            String: Description of the outgoing transitions of the state.
        """
        s = f'<b>{str(v)}</b>'
        for tr in self.out_transitions(v):
            try:
                num_occur = f'[{self.num_occur(tr)}]'
            except:
                num_occur = '/'
            s += f"<br>{tr[2]} -> {tr[1]} {num_occur}"
        return s

    def sample_initial(self):
        candidates = [q for q in self.q0.keys() if self._state_allowed(q)]
        if len(candidates) == 0:
            candidates = self.active_states
        if len(candidates) == 0:
            candidates = list(self.discrete_states)
        if len(self.q0) == 0:
            current_q = np.random.choice(candidates, 1)[-1]
            warnings.warn(
                'Initial state not defined, sampling initial state uniformly from the set of all states.')
        else:
            current_q = np.random.choice(candidates, 1)[-1]
        return current_q

    # def simulate(self, finish_time=100, current_q=None):
    #     """
    #     Simulates behaviour of the system.
    #     :param finish_time: Time when simulation finishes.
    #     :return: generated data.
    #     """
    #
    #     if current_q is None:
    #         if len(self.q0) == 0:
    #             current_q = np.random.choice(list(self.discrete_states.keys()), 1)[-1]
    #             warnings.warn(
    #                 'Initial state not defined, sampling initial state uniformly from the set of all states.')
    #         else:
    #             current_q = np.random.choice(list(self.q0.keys()), 1)[-1]
    #
    #     state_sequence = []
    #     data_state = []
    #     t = 0
    #     last_x = None
    #     last_output = None
    #     states = []
    #     data = []
    #     current_e = None
    #
    #     while True:
    #         cont_state, cont_time, cont_output = self.__step_continuous()
    #         last_x = dict(cont_state.iloc[-1])
    #         last_output = dict(cont_output.iloc[-1])
    #         cont_time = cont_time + t - cont_time.iloc[0, 0]
    #         clock = cont_time.iloc[-1, 0] - cont_time.iloc[0, 0]
    #
    #         current_state = current_q
    #
    #         observed_current_state = current_state
    #         state_sequence.append(
    #             pd.DataFrame(np.full((cont_time.size - 1, 3), (current_state, observed_current_state, current_e)),
    #                          index=cont_time.iloc[0:-1, 0],
    #                          columns=['State', 'Observed State', 'Event']))
    #
    #         data_state.append(cont_output.iloc[:-1].set_index(cont_time.iloc[0:-1, 0]))
    #
    #         tr = self.out_transitions(current_state)
    #
    #         if len(tr) == 0:
    #             break
    #         elif len(tr) != 1:
    #             tr = np.random.choice(tr, 1)[-1]
    #             warnings.warn('Multiple transitions can occur.')
    #         else:
    #             tr = tr[0]
    #
    #         last_q = current_q
    #         current_e = tr.event
    #         self.apply_sim_event(current_e)
    #         if cont_time.size == 0:
    #             break
    #         t += clock
    #
    #         if t >= finish_time:
    #             break
    #
    #         states.append(pd.concat(state_sequence, axis=0))
    #         data.append(pd.concat(data_state, axis=0))
    #     return states, data

    def predict_state(self, data_collection, time_col_name=None, discr_col_names=None):
        for data in data_collection:
            data["StateEstimate"] = None
            data["Event"] = None

            prev_discr_state = None
            prev_time = None

            if discr_col_names is None:
                discr_col_names = data.columns
            if time_col_name is not None:
                discr_col_names -= time_col_name

            for row in data[discr_col_names].itertuples(index=True):
                if time_col_name is not None:
                    time = data[time_col_name].iloc[row[0]]
                else:
                    time = row[0]
                discr_state = row[1:]
                if prev_discr_state is not None and prev_discr_state != discr_state:
                    event = np.asarray(discr_state) - np.asarray(prev_discr_state)
                    event = ' '.join(str(x) for x in event)
                    data.loc[row[0], "Event"] = event

                data.loc[row[0], "StateEstimate"] = signal_vector_to_state(discr_state)
                prev_discr_state = discr_state
                prev_time = time
        return data_collection


    def read_event(self, t, e, clear_p=False, keep_p=None, **kwargs):
        if keep_p:
            for k in keep_p:
                kwargs[k] = self._p[k]
        if clear_p or keep_p:
            self._p = kwargs
        else:
            self._p.update(kwargs)
        self._p.pop('Error Message', None)
        self._t = t

        self._p['event'] = e

        new_q, self._xt, self._xk = self.discrete_event_dynamics(self._q, self._xt, self._xk, self._p)
        new_q = new_q[0]

        if new_q == self.UNKNOWN_STATE:
            text2 = '{}->{}->{}'.format(self.discrete_state, e, new_q)
            self._p['Error Message'] = text2
        else:
            dl = self.decision_logic
            # d = self.check_decisions(self.x, self.overall_system.state)
            if dl and not self.overall_system.is_unknown(self):
                try:
                    state_key = json.dumps(self.overall_system.get_choice_state(self), sort_keys=True)
                except:
                    print('Problem getting choice state')
                old_e = self.choices_set.get(state_key, None)
                if old_e is not None:
                    if self._e not in old_e:
                        # warnings.warn(
                        #     '{}: Different decision for same state {}: {} vs {}'.format(t, state_key, old_e, self._e))
                        self.choices_set[state_key][self._e] = [str(t)]
                    else:
                        self.choices_set[state_key][self._e].append(str(t))
                else:
                    self.choices_set[state_key] = {self._e: [str(t)]}
                # if self._e not in (x[0] for x in d):
                #     warnings.warn('{}: Not allowed decision in state {}: {}'.format(t, state_key, self._e))

        self.discrete_state = new_q
        self._past_t.append(t)
        self._past_p.append(self._p)
        if len(self._discrete_state_data):
            self._discrete_state_data[-1]["Finish"] = t
        self._discrete_state_data.append(dict(Time=t, Event=e, Mode=self.discrete_state, **self._p))
        self._discrete_output_data.append([t, *self._d, self._p.get('event', None)])

num_modes property

Returns the number of modes in the automaton. :return: number of states.

num_transitions property

Returns the number of transitions in the automaton. :return: number of transitions.

state property writable

Automata discrete state is uni-variate. :return:

__init__(states=None, transitions=None, configuration=None, unknown_state='raise', id='', initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(), **kwargs)

Class initialization from lists of elements. :param states: Discrete states / modes of continuous behavior. :param events: The events that trigger state transitions. :param transitions: The transition information. If a collection of dicts then dict should contain "source", "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection of tuples can be used of the form (source, event, dest, *). :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.

Source code in ml4cps/automata/base.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, states: list = None, transitions: list = None, configuration=None,
             unknown_state: str = 'raise', id="", initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(),
             **kwargs):
    """
    Class initialization from lists of elements.
    :param states: Discrete states / modes of continuous behavior.
    :param events: The events that trigger state transitions.
    :param transitions: The transition information. If a collection of dicts then dict should contain "source",
    "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection
    of tuples can be used of the form (source, event, dest, *).
    :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.
    """
    self._G = nx.MultiDiGraph()
    if initial_q and isinstance(initial_q, str):
        initial_q = [initial_q]
    self.q0 = OrderedDict.fromkeys(initial_q)
    self.initial_r = initial_r
    if final_q and isinstance(final_q, str):
        final_q = [final_q]
    self.final_q = OrderedDict.fromkeys(final_q)
    self.previous_node_positions = None
    self.UNKNOWN_STATE = unknown_state
    self.configuration = configuration
    if super_states is not None:
        if type(super_states) is str:
            self.__super_states = [super_states]
        else:
            self.__super_states = list(super_states)

        # self._G.add_nodes_from(self.__super_states)

    if decision_states is not None:
        if type(decision_states) is str:
            self.decision_states = [decision_states]
        else:
            self.decision_states = list(decision_states)

    if states is not None:
        for state in states:
            if isinstance(state, dict):
                state_data = dict(state)
                state_name = state_data.pop("name", state_data.pop("state", None))
                if state_name is None:
                    raise ValueError('State dict must contain "name" or "state".')
                if "configs" in state_data:
                    raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
                self.add_state(state_name, **state_data)
            else:
                self.add_state(state)

    if transitions is not None:
        for tr in transitions:
            if type(tr) is dict:
                tr_data = dict(tr)
                source = tr_data.pop("source")
                destination = tr_data.pop("destination", tr_data.pop("dest", None))
                if destination is None:
                    raise ValueError('Transition dict must contain "destination" or "dest".')
                event = tr_data.pop("event")
                guard = tr_data.pop("guard", None)
                if "configs" in tr_data:
                    raise ValueError('Transition dict field "configs" is not supported. Use "enabled" instead.')
                self.add_transition(source, destination, event, guard=guard, **tr_data)
            else:
                if len(tr) < 3:
                    raise ValueError("Transition tuple must contain at least (source, event, destination).")
                source, event, destination = tr[0], tr[1], tr[2]
                transition_data = tr[3] if len(tr) > 3 and isinstance(tr[3], dict) else {}
                self.add_transition(source, destination, event, **transition_data)

    if 'discr_state_names' not in kwargs:
        kwargs['discr_state_names'] = ['Mode']
    elif type(kwargs['discr_state_names']) is str:
        kwargs['discr_state_names'] = [kwargs['discr_state_names']]
    CPSComponent.__init__(self, id, unknown_state=unknown_state, **kwargs)

__str__()

String representation of the automaton. :return:

Source code in ml4cps/automata/base.py
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
def __str__(self):
    """
String representation of the automaton.
    :return:
    """
    return f"""Automaton:
Number of states: {self.num_modes}
Number of transitions: {self.num_transitions}
Configuration: {self.configuration}
Initial state(s): {list(self.q0.keys())}
Final state(s): {list(self.final_q.keys())}"""

add_final_state(states)

Add final state(s) of the automaton. :param states: States to add.

Source code in ml4cps/automata/base.py
821
822
823
824
825
826
827
828
829
830
831
def add_final_state(self, states):
    """
Add final state(s) of the automaton.
    :param states: States to add.
    """
    if type(states) is str:
        states = (states,)
    for s in states:
        if s is not None and s not in self.final_q:
            self.final_q[s] = None
        self._G.add_node(s)

add_initial_state(states)

Add initial state(s) of the automaton. :param states: States to add.

Source code in ml4cps/automata/base.py
809
810
811
812
813
814
815
816
817
818
819
def add_initial_state(self, states):
    """
Add initial state(s) of the automaton.
    :param states: States to add.
    """
    if type(states) is str:
        states = (states,)
    for s in states:
        if s is not None and s not in self.q0:
            self.q0[s] = None
        self._G.add_node(s)

add_state(new_state, enabled=None, **kwargs)

Add state to the automaton. :param new_state: State to be added.

Source code in ml4cps/automata/base.py
731
732
733
734
735
736
737
738
739
def add_state(self, new_state, enabled=None, **kwargs):
    """
Add state to the automaton.
    :param new_state: State to be added.
    """
    resolved_enabled = self._resolve_enabled_for_add(enabled=enabled)
    if resolved_enabled is not None:
        kwargs["enabled"] = resolved_enabled
    self._G.add_node(new_state, **kwargs)

add_state_data(s, d)

Add state data to a state s the automaton. :param s: state :param d: data to be added to s :return:

Source code in ml4cps/automata/base.py
722
723
724
725
726
727
728
729
def add_state_data(self, s: str, d: object):
    """
Add state data to a state s the automaton.
    :param s: state
    :param d: data to be added to s
    :return:
    """
    self.Q[s] = d

add_states_from(new_state, enabled=None, **kwargs)

Add multiple states to the automaton. :param new_state: States to be added.

Source code in ml4cps/automata/base.py
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
def add_states_from(self, new_state, enabled=None, **kwargs):
    """
Add multiple states to the automaton.
    :param new_state: States to be added.
    """
    if isinstance(new_state, str):
        new_state = [new_state]
    for state in new_state:
        if isinstance(state, dict):
            state_data = dict(state)
            state_name = state_data.pop("name", state_data.pop("state", None))
            if state_name is None:
                raise ValueError('State dict must contain "name" or "state".')
            if "configs" in state_data:
                raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
            state_enabled = state_data.pop("enabled", enabled)
            merged_state_data = dict(kwargs)
            merged_state_data.update(state_data)
            self.add_state(state_name, enabled=state_enabled, **merged_state_data)
        elif isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], dict):
            state_data = dict(state[1])
            if "configs" in state_data:
                raise ValueError('State dict field "configs" is not supported. Use "enabled" instead.')
            state_enabled = state_data.pop("enabled", enabled)
            merged_state_data = dict(kwargs)
            merged_state_data.update(state_data)
            self.add_state(state[0], enabled=state_enabled, **merged_state_data)
        else:
            self.add_state(state, enabled=enabled, **kwargs)

add_transition(s, d, e, enabled=None, guard=None, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

Source code in ml4cps/automata/base.py
799
800
801
802
803
804
805
806
807
def add_transition(self, s, d, e, enabled=None, guard=None, **other):
    """
Add multiple transition.
    :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
    """
    resolved_enabled = self._resolve_enabled_for_add(enabled=enabled)
    if resolved_enabled is not None:
        other["enabled"] = resolved_enabled
    self._G.add_edge(s, d, e, **other, event=e, guard=guard)

add_transitions_from(list_of_tuples, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

Source code in ml4cps/automata/base.py
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
def add_transitions_from(self, list_of_tuples, **other):
    """
Add multiple transition.
    :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
    """
    for tr in list_of_tuples:
        if isinstance(tr, dict):
            tr_data = dict(other)
            tr_data.update(tr)
            source = tr_data.pop("source")
            destination = tr_data.pop("destination", tr_data.pop("dest", None))
            if destination is None:
                raise ValueError('Transition dict must contain "destination" or "dest".')
            event = tr_data.pop("event")
            if "configs" in tr_data:
                raise ValueError('Transition dict field "configs" is not supported. Use "enabled" instead.')
            enabled = tr_data.pop("enabled", None)
            guard = tr_data.pop("guard", None)
            self.add_transition(source, destination, event, enabled=enabled, guard=guard, **tr_data)
        else:
            if len(tr) < 3:
                raise ValueError("Transition tuple must contain at least (source, destination, event).")
            source, destination, event = tr[0], tr[1], tr[2]
            tr_data = dict(other)
            if len(tr) > 3 and isinstance(tr[3], dict):
                tr_data.update(tr[3])
            self.add_transition(source, destination, event, **tr_data)

flow(q, p, x, u)

Flow equation gives derivative of the continuous variables. :param q: Current discrete state of the model. :param p: Stochastic parameters generated on entry to state current_q. :param x: Current continuous state of the model. :param u: Calculated internal i signals. :return: Derivative of the continuous state x.

Source code in ml4cps/automata/base.py
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
def flow(self, q, p, x, u):
    """
Flow equation gives derivative of the continuous variables.
    :param q: Current discrete state of the model.
    :param p: Stochastic parameters generated on entry to state current_q.
    :param x: Current continuous state of the model.
    :param u: Calculated internal i signals.
    :return: Derivative of the continuous state x.
    """
    pass

get_num_in(q)

Returns the number of in transitions of state q in the automaton. :return: number of transitions.

Source code in ml4cps/automata/base.py
860
861
862
863
864
865
866
867
868
def get_num_in(self, q):
    """
    Returns the number of in transitions of state q in the automaton.
    :return: number of transitions.
    """
    if self._G.has_node(q):
        return self._G.in_degree(q)
    else:
        raise Exception(f'State {q} not in the automaton.')

get_num_out(q)

Returns the number of out transitions of state q in the automaton. :return: number of transitions.

Source code in ml4cps/automata/base.py
870
871
872
873
874
875
876
877
878
def get_num_out(self, q):
    """
    Returns the number of out transitions of state q in the automaton.
    :return: number of transitions.
    """
    if self._G.has_node(q):
        return self._G.out_degree(q)
    else:
        raise Exception(f'State {q} not in the automaton.')

get_transition(s, d=None, e=None, if_more_than_one='raise')

Get all transitions with source state s, destination state __d. In case when e is provided, the returned list contains transitions where event is e. :param if_more_than_one: :param s: Source state. :param d: Destination state. :param e: Event. :return:

Source code in ml4cps/automata/base.py
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
def get_transition(self, s, d=None, e=None, if_more_than_one='raise'):
    """
    Get all transitions with source state s, destination state __d. In case when e is provided, the returned list
    contains transitions where event is e.
    :param if_more_than_one:
    :param s: Source state.
    :param d: Destination state.
    :param e: Event.
    :return:
    """
    transitions = list(self.out_transitions(s))
    if e is None and d is not None:
        transitions = [trans for trans in transitions if trans[1] == d]
    elif d is None and e is not None:
        transitions = [trans for trans in transitions if trans[3].get("event", trans[2]) == e]
    elif d is not None and e is not None:
        transitions = [trans for trans in transitions if trans[1] == d and trans[3].get("event", trans[2]) == e]
    else:
        transitions = transitions

    if len(transitions) > 1:
        if if_more_than_one == 'raise':
            raise Exception('There are multiple transitions which satisfy the condition.')
        else:
            return transitions
    elif len(transitions) == 0:
        return None
    else:
        return transitions[0]

in_transitions(s, event=None)

Get all incoming transitions of state s. :param s: :return:

Source code in ml4cps/automata/base.py
889
890
891
892
893
894
895
896
897
898
899
def in_transitions(self, s, event=None):
    """
Get all incoming transitions of state s.
    :param s:
    :return:
    """
    s = self._normalize_state_ref(s)
    transitions = list(self._G.in_edges(s, data=True, keys=True))
    if event is not None:
        transitions = [e for e in transitions if e[3].get("event", e[2]) == event]
    return [e for e in transitions if self._transition_allowed(*e)]

inv(t, q, x, y, z, p)

Invariants. :param t: :param q: :param x: :param y: :param z: :param p:

Source code in ml4cps/automata/base.py
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
def inv(self, t, q, x, y, z, p):
    """
Invariants.
    :param t:
    :param q:
    :param x:
    :param y:
    :param z:
    :param p:
    """
    pass

is_transition(s, d, e)

Check if a transition (s,d,e) exists in the automaton. :param s: Source. :param d: Destination. :param e: Event. :return:

Source code in ml4cps/automata/base.py
833
834
835
836
837
838
839
840
841
842
843
844
845
def is_transition(self, s, d, e):
    """
Check if a transition (s,d,e) exists in the automaton.
    :param s: Source.
    :param d: Destination.
    :param e: Event.
    :return:
    """
    transitions = [trans for trans in self.T.values() if trans['source'] == s and
                   trans['destination'] == d and trans['event'] == e]

    is_transition = len(transitions) != 0
    return is_transition

merge(q1, q2)

If two states are compatible, they are merged with the function merge. The transitions of the automaton, the in- and outdegree of the states and the number of transitions happening are adjusted.

Source code in ml4cps/automata/base.py
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
def merge(self, q1, q2):
    """
    If two states are compatible, they are merged with the function merge. The transitions
    of the automaton, the in- and outdegree of the states and the number of transitions
    happening are adjusted.
    """
    intr = self.in_transitions(q2)
    # outtr_q1 = list(self.out_transitions(q1))
    outtr_q2 = list(self.out_transitions(q2))
    # nx.contracted_nodes(self._G, w, v, copy=False)
    # set event

    for tr in intr:
        if tr[0] != q2:
            self.add_single_transition(tr[0], q1, tr[2], timing=tr[-1]['timing'])
    # if q1=='q0' and q2 == 'q192':
    #     print('found')
    for tr in outtr_q2:
        dest = tr[1]
        if dest == q2:
            dest = q1
        self.add_single_transition(q1, dest, tr[2], timing=tr[-1]['timing'])
    self.remove_state(q2)

out_transitions(s, event=None)

Get all outgoing transitions of state s. :param event: :param s: :return:

Source code in ml4cps/automata/base.py
901
902
903
904
905
906
907
908
909
910
911
912
def out_transitions(self, s, event=None):
    """
Get all outgoing transitions of state s.
    :param event:
    :param s:
    :return:
    """
    s = self._normalize_state_ref(s)
    transitions = list(self._G.out_edges(s, data=True, keys=True))
    if event is not None:
        transitions = [e for e in transitions if e[3].get("event", e[2]) == event]
    return [e for e in transitions if self._transition_allowed(*e)]

print_state(v)

Prints outgoing transitions of a state v.

Parameters:

Name Type Description Default
v state
required

Returns:

Name Type Description
String

Description of the outgoing transitions of the state.

Source code in ml4cps/automata/base.py
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
def print_state(self, v):
    """Prints outgoing transitions of a state v.

    Args:
        v (state): 

    Returns:
        String: Description of the outgoing transitions of the state.
    """
    s = f'<b>{str(v)}</b>'
    for tr in self.out_transitions(v):
        try:
            num_occur = f'[{self.num_occur(tr)}]'
        except:
            num_occur = '/'
        s += f"<br>{tr[2]} -> {tr[1]} {num_occur}"
    return s

remove_transition(source, dest)

Remove the transition(s) from source to dest. :param source: :param dest: :return:

Source code in ml4cps/automata/base.py
575
576
577
578
579
580
581
582
583
584
585
586
587
588
def remove_transition(self, source, dest):
    """
    Remove the transition(s) from source to dest.
    :param source:
    :param dest:
    :return:
    """
    if self.configuration is not None:
        tr = self.get_transition(source, dest)
        for v in tr[3].values():
            if isinstance(v, dict):
                v.pop(self.configuration, None)
    else:
        self._G.remove_edge(source, dest)

rename_events(prefix='e_')

Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.

Source code in ml4cps/automata/base.py
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
def rename_events(self, prefix="e_"):
    """
Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.
    """
    i = 0
    new_events_dict = OrderedDict()
    for k, v in self.Sigma.items():
        new_key = f'{prefix}{i}'
        new_value = v
        if new_value is None:
            new_value = {}
        new_value['old_symbol'] = k
        i += 1
        new_events_dict[new_key] = new_value
        for t in self.T.values():
            if t['event'] == k:
                t['event'] = new_key

step(q, x0, t, u)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

Source code in ml4cps/automata/base.py
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
def step(self, q, x0, t, u):
    """
Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
    :param x0: Initial state vector.
    :param t: Time at start of the step simulation.
    :param u: Arguments passed.....
    :return: Time t+dt, value of state at t+dt
    """
    s = solve_ivp(self.flow, t_span=(t, t + self.dt), y0=x0, method='RK23', args=u)
    xc = s.y[:, -1]
    t = s.t[-1]

    xk = self.time_discrete_dynamics(t, q, x0)
    return t, np.concatenate(xc, xk)

try_merge_states(state1, state2, try_fun=None)

Merge state2 into state1 and update transitions.

Source code in ml4cps/automata/base.py
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
def try_merge_states(self, state1, state2, try_fun=None):
    """Merge state2 into state1 and update transitions."""
    old_G = self._G
    make_final = state2 in self.final_q and state1 not in self.final_q
    make_initial = state2 in self.q0 and state1 not in self.q0
    self._G = nx.contracted_nodes(self._G, state1, state2)
    # make it deterministic again
    events = [x[3]['event'] for x in self.out_transitions(state1)]
    deterministic = len(set(events)) == len(events)

    if deterministic:
        if make_initial:
            self.add_initial_state(state1)
        if make_final:
            self.add_final_state(state1)

        if try_fun is not None and not try_fun(self):
            self._G = old_G
            if make_final:
                self.final_q.pop(state1)
            if make_initial:
                self.q0.pop(state1)
    else:
        self._G = old_G

CPSComponent

Bases: PythonModel, Simulator

General hybrid system class based on scipy and simpy.

Source code in ml4cps/cps/base.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
class CPSComponent(PythonModel, sim.Simulator):
    """
    General hybrid system class based on scipy and simpy.
    """

    def __init__(self, id, t=0, initial_q=(), initial_xt: list = (), initial_xk: list = (), dt=-1., p=None,
                 cont_state_names=None, discr_state_names=None, discr_output_names=None,
                 cont_output_names=None, unknown_state="Unknown"):
        sim.Simulator.__init__(self)
        PythonModel.__init__(self)

        self._parent_system = None
        self.decision_logic = None
        self.id = id
        self.dt = dt

        # State variables
        self._t = t
        self._q = initial_q
        self._xt = initial_xt
        self._xk = initial_xk

        # Other variables
        self._y = None
        self._u = None
        self._d = ()
        self._p = {} if p is None else p

        # simpy events
        self._pending_events = []
        self._block_event = None

        # List to append the past values
        self._past_p = []
        self._past_t = []
        self._discrete_state_data = []
        self._discrete_output_data = []
        self._continuous_state_data = []
        self._continuous_output_data = []

        # Variable names
        self.cont_state_names = cont_state_names
        self.cont_output_names = cont_output_names
        self.discrete_state_names = discr_state_names
        self.discrete_output_names = discr_output_names

        self.UNKNOWN_STATE = unknown_state

        self.output_y = None
        self.output_d = None



    # def copy(self):
    #     new_copy = CPSComponent(self.id, self._t, initial_q=self._q, initial_xt=self._xt,
    #                             initial_xk = self._xk, dt = self.dt, p=self._p,
    #     cont_state_names = copy.copy(self.cont_state_names), discr_state_names = copy.copy(self.discrete_state_names),
    #                             discr_output_names=copy.copy(self.discrete_output_names),
    #     cont_output_names=copy.copy(self.cont_output_names), unknown_state = copy.copy(self.UNKNOWN_STATE))
    #     return new_copy

    def copy(self):
        new_copy = copy.deepcopy(self)
        return new_copy

    @property
    def parent_system(self):
        return self._parent_system

    @property
    def full_id(self):
        full_id = self.id
        s = self.parent_system
        while s.parent_system:
            full_id = s.id + "." + full_id
            s = s.parent_system
        return full_id

    @property
    def overall_system(self):
        s = self
        while s.parent_system:
            s = s.parent_system
        return s

    @property
    def state(self):
        """
        System state is a vector of all state variables.
        :return:
        """
        return self._q, self._xt, self._xk

    @state.setter
    def state(self, value):
        self._q = value[0]
        self._xt = value[1]
        self._xk = value[2]

    @property
    def discrete_state(self):
        return self._q

    @discrete_state.setter
    def discrete_state(self, value):
        self._q = value

    def is_decision(self, state, overall_state):
        return False

    def __str__(self):
        return '{}: {}'.format(self.id, self._q)

    def _step_continuous(self, clock_start, clock_finish):
        return self.__step_continuous(clock_start, clock_finish)

    def simulation_process_simpy(self, env, max_time, verbose=False):
        """
            The function simulates single concurrent thread of one component (CPSComponent).
            Parameters
            ----------
            env : simpy.Environment
                It is simpy environment used to synchronize multiple parallel threads during simulation.
            max_time : float
                It is the maximum time that the simulation will perform.
            verbose : bool
                Should the function print execution logs (default is False).
        """
        env.process(self.discrete_dynamics_process(env, max_time, verbose=verbose))
        env.process(self.continuous_dynamics_process(env, max_time, verbose=verbose))
        env.process(self.discrete_time_dynamics_process(env, max_time, verbose=verbose))

    def discrete_event_dynamics(self, q, xt, xk, p) -> tuple:
        pass

    def discrete_dynamics_process(self, env, max_time, verbose=False):
        # self._env = env
        self._u = self.input(self._q, 0)
        if self._p is None:
            self._p = self.context(self._q, None, None, 0)
        self._past_t = [0]
        self._past_p = [self._p]


        self._discrete_state_data = []
        self._discrete_output_data = []

        while True:
            discr_state = dict(zip(self.discrete_state_names, self._q))
            if self.output_d is None:
                self._d = self.discrete_state
            else:
                self._d = self.output_d(self.discrete_state, self._xt, self._xk)
            self._discrete_state_data.append(dict(Time=env.now, **discr_state, **self._p))
            self._discrete_output_data.append([env.now, *self._d])

            # Stop the simulation if the max time is reached
            if env.now > max_time:
                if verbose:
                    print(f'Stop because of maximum time {max_time} is reached.')
                break
            # Stop if the stopping condition of the overall system is met
            elif self.overall_system.finish_condition():
                if verbose:
                    print('Stop because stop condition of the overall system is met.')
                break

            # DETERMINE THE NEXT EVENT
            # 1. CHECK IF TIMED EVENT
            try:
                event_delay, new_state_value = self.timed_event(*self.state)
            except Exception as ex:
                print(f"{self.id}: Exception during self.timed_transition in state {self._q}")
                raise ex

            # 2. IF NOT TIMED THEN WAIT
            if event_delay is None:
                if verbose:
                    print('{}: Waiting in: {}'.format(self.id, self._q))
                if self._block_event is None:
                    self._block_event = env.event()
                    yield self._block_event
                else:
                    raise Exception("{}: State: {}. Exception during block event.".format(self.id, self._q))
                event = self._block_event.value
                self._block_event = None

                self._p['event'] = event
                new_q, xt, xk = self.discrete_event_dynamics(self._q, self._xt, self._xk, self._p)
                if verbose:
                    print(f'{self._q}->{event}->{new_q}')
                if xt is not None:
                    self._xt = xt
                if xk is not None:
                    self._xk = xk

                self._q = new_q
            else:
                if verbose:
                    print('Waiting in: {} for {}. New state: {}'.format(self._q, event_delay, new_state_value))
                yield env.timeout(event_delay)
                self._q = (new_state_value,)

            try:
                self.on_entry(self._q, self._p)
            except Exception as ex:
                print(f"{self.id}: Exception during discrete event dynamics '{self._q}'->")
                raise ex

    def discrete_time_dynamics_process(self, env, max_time, verbose=False):
        while (self._xk is not None and len(self._xk) > 0) or self.output_y is not None:
            self._continuous_state_data.append((self._t, *self._xk))
            self._continuous_output_data.append((self._t, *self.output_y(*self.state)))

            # Stop the simulation if the max time is reached
            if env.now > max_time:
                if verbose:
                    print(f'Stop because of maximum time {max_time} is reached.')
                break
            # Stop if the stopping condition of the overall system is met
            elif self.overall_system.finish_condition():
                if verbose:
                    print('Stop because stop condition of the overall system is met.')
                break

            yield env.timeout(self.dt)
            xk = self.time_discrete_dynamics(self._q[0], self._p, self._xk, self._u)
            self._xk = xk
            self._t = env.now
            grds = self.guards(self._q[0], self._xk)
            if grds is not None:
                self.apply_sim_event(grds, env)


    def guards(self, q, x):
        pass

    def continuous_dynamics_process(self, env, max_time, verbose=False):
        # try:
        #     if len(self._xt) > 0 or len(self._xk) > 0:  # there is time-continuous state variable
        #         self.__step_continuous(*self.state)
        # except Exception as ex:
        #     print_exc()
        #     raise Exception('Exception during self.__step_continuous in state {}'.format(self._q))

        # EXECUTE THE NEXT EVENT
        if False:
            yield
        # if verbose:
        #     print('--------------------')
        #
        # self._past_t.append(env.now)
        # self._past_p.append(self._p)
        #
        # self._discrete_state_data[-1]['Finish'] = env.now
        # discr_state = dict(zip(self.discrete_state_names, self._q))
        # self._discrete_state_data.append(
        #     dict(Time=env.now, Finish=None, **discr_state, **self._p))
        # self._discrete_output_data.append([env.now, *self._d])
        #
        # if verbose:
        #     print('{}: Time: {} State: {}'.format(self.id, env.now, self._q))
        #
        # if self._xt is not None:
        #     self._continuous_state_data.append([env.now, *self._xt, *self._xk])
        # if self._y is not None:
        #     self._continuous_output_data.append([env.now, *self._y])
        # if self._xt is not None:
        #     self._xt = self.__step_continuous(env.now - clock_start)
        # self._d, self._y = self.output(self._q, 0, self._xt, self._xk, self._u, self._p)
        #
        # print('--------------------')
        # print('{}: Simulation finished'.format(self.id))

    def simulate(self, finish_time, verbose=False, reinitialize=True, save_choices=False):
        """Simulates behavior of the system until the finish_time is reached.

        Parameters
        ----------
        finish_time : Time when simulation finishes.
        verbose : Should the execution log be printed in detail (default is False).
        save_choices : Should the choices for each component be saved to json files after
        the simulation is finished (default is False).
        """
        self.reinitialize(0)

        env = simpy.Environment()
        finish_time = float(finish_time)

        if verbose:
            print('Simulation started: ')

        self.simulation_process_simpy(env, finish_time, verbose)
        env.run(until=finish_time)

        discr_state_data = {}
        discr_output_data = {}
        cont_state_data = {}
        cont_output_data = {}

        try:
            if len(self._xt) > 0 and len(self._xt) > 0 and finish_time > self._t:  # there is time-continuous state variable
                self._step_continuous(self._t, finish_time)
        except Exception as ex:
            print_exc()
            raise Exception('Exception during self._step_continuous in state {}'.format(self._q))

        try:
            self.finish(env.now)

            discr_state_data = pd.DataFrame(self._discrete_state_data).set_index("Time")
            discr_output_data = tools.data_list_to_dataframe(None, self._discrete_output_data,
                                                                   self.discrete_output_names, 'd')
            try:
                cont_state_data = tools.data_list_to_dataframe(self.id, self._continuous_state_data,
                                                               self.cont_state_names, 'x')
            except Exception as ex:
                print_exc()
            cont_output_data = tools.data_list_to_dataframe(self.id, self._continuous_output_data,
                                                                     self.cont_output_names, 'y')
        except Exception as ex:
            print_exc()
            warnings.warn('Simulation failed.')

        if verbose:
            print('Simulation finished.')
        return discr_state_data, discr_output_data, cont_state_data, cont_output_data, env.now

    def get_sim_state(self):
        return self._q, self._p, self._y, self._block_event

    def set_sim_state(self, q, e, p, y, block_event):
        self._q = q
        self._p = p
        self._y = y
        self._block_event = block_event

    def finish_condition(self):
        pass

    def reinitialize(self, t, state=None):
        if state is not None:
            self.state = state

        self._p = self.context(self._q, None, None, 0)
        self._u = self.input(self._q, 0)
        self._past_t = [t]
        self._past_p = [self._p]
        # self.output(self._q, 0, self._xt, self._xk, self._u, self._p)
        self._continuous_state_data = []
        self._continuous_output_data = []
        try:
            discr_state = dict(zip(self.discrete_state_names, self._q))
        except:
            raise Exception('Exception during discrete state initialization')

        self._discrete_state_data = [dict(Time=t, **discr_state, **self._p)]
        self._discrete_output_data = []
        self._block_event = None
        self._pending_events = []
        self._t = t

    def get_execution_data(self):
        try:
            data = pd.DataFrame(self._discrete_state_data) #, columns=['Time', 'Finish', 'State', 'Event'] + list(self._p.keys()))
        except ValueError as ve:
            warnings.warn(str(self._discrete_state_data))
            warnings.warn('Discrete state data not available due to value error.')
            for i, row in enumerate(self._discrete_state_data):
                for k, v in row.items():
                    if not is_scalar(v):
                        print(f"Non-scalar at row {i}, key '{k}'")
                        print("Type:", type(v))
                        print("Length:", len(v) if hasattr(v, "__len__") else "N/A")
                        break
            raise ve

        data['Finish'] = data['Time'].shift(-1)
        data['Duration'] = pd.to_timedelta(data['Finish'] - data['Time']).dt.total_seconds()
        return data

    # def clear_execution_data(self):
    #     self.discrete_state_data = None
    #     data = pd.DataFrame(self._discrete_state_data) #, columns=['Time', 'Finish', 'State', 'Event'] + list(self._p.keys()))
    #     data['Finish'] = data['Time'].shift(-1)
    #     data['Duration'] = pd.to_timedelta(data['Finish'] - data['Time']).dt.total_seconds()
    #     return data

    def update_input(self, u):  # Set i if you want from outside
        self._u = u

    def output_d(self, q, xt, xk):
        return ()

    def output_y(self, q, xt, xk):
        return ()

    def input(self, q, clock) -> tuple:
        pass

    def context(self, q, past_t, past_p, t) -> tuple:
        return dict()

    def time_continuous_dynamics(self, t, xt, xk, q, u, p, y):
        pass

    def time_discrete_dynamics(self, q, p, x, u):
        pass

    def on_entry(self, q, context):
        pass

    def __step_continuous(self, clock_start, clock_finish):
        """
    Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
        :param x0: Initial state vector.
        :param t: Time at start of the step simulation.
        :param u: Arguments passed.....
        :return: Time t+dt, value of state at t+dt
        """

        s = solve_ivp(self.time_continuous_dynamics, t_eval=np.arange(clock_start + self.dt, clock_finish + self.dt, self.dt),
                      t_span=(clock_start, clock_finish + self.dt), y0=self._xt, method='RK23',
                      args=(self._xk, self._q, self._u, self._p, self._y))

        self._xk = self.time_discrete_dynamics(self._q, self._p, self._xt, self._u)
        self._xt = s.y[-1]

        # self._continuous_state_data += np.concatenate([s.t.T[None], s.y]).T.tolist()

    def invariants(self, q, clock, xc, xd, y):
        pass

    def timed_event(self, q, xc, xd):
        """
        Calculates if and when the next time event will happen and the new state values.
        :param t: Current time.
        :param q: Discrete-event part of the state.
        :param xc: Time-continuous part of the state.
        :param xd: Time-discrete part of the state.
        :return: Time delay of the timed event, new state value.
        """
        return None, None

    def wait(self, q):
        pass

    def apply_sim_event(self, e, env=None):
        """
        The event e is applied in this component's simpy execution, this means that the process must wait for an event.
        :param e: Event to apply.
        :return:
        """
        if not self._block_event:
            # self._block_event = self._env.now
            self._block_event = env.now
        # if not self._block_event:
        #     raise Exception("Block event is not set, but event is applied.")
        elif self._block_event.triggered:
            # pass
            # if self._e != e:
            raise Exception('Should not happen')
        else:
            self._block_event.succeed(value=e)

    def finish(self, t, **kwargs):
        if self._p is None:
            self._p = kwargs
        else:
            self._p.update(kwargs)
        self._t = t
        # self._e = None
        # self._q = None
        self._past_p.append(self._p)
        # self._discrete_state_data[-1]['Finish'] = t

    def get_alternatives(self, state, system_state):
        return None

    def get_decision_state(self, state, overall_state):
        return state

state property writable

System state is a vector of all state variables. :return:

__step_continuous(clock_start, clock_finish)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

Source code in ml4cps/cps/base.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
def __step_continuous(self, clock_start, clock_finish):
    """
Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
    :param x0: Initial state vector.
    :param t: Time at start of the step simulation.
    :param u: Arguments passed.....
    :return: Time t+dt, value of state at t+dt
    """

    s = solve_ivp(self.time_continuous_dynamics, t_eval=np.arange(clock_start + self.dt, clock_finish + self.dt, self.dt),
                  t_span=(clock_start, clock_finish + self.dt), y0=self._xt, method='RK23',
                  args=(self._xk, self._q, self._u, self._p, self._y))

    self._xk = self.time_discrete_dynamics(self._q, self._p, self._xt, self._u)
    self._xt = s.y[-1]

apply_sim_event(e, env=None)

The event e is applied in this component's simpy execution, this means that the process must wait for an event. :param e: Event to apply. :return:

Source code in ml4cps/cps/base.py
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def apply_sim_event(self, e, env=None):
    """
    The event e is applied in this component's simpy execution, this means that the process must wait for an event.
    :param e: Event to apply.
    :return:
    """
    if not self._block_event:
        # self._block_event = self._env.now
        self._block_event = env.now
    # if not self._block_event:
    #     raise Exception("Block event is not set, but event is applied.")
    elif self._block_event.triggered:
        # pass
        # if self._e != e:
        raise Exception('Should not happen')
    else:
        self._block_event.succeed(value=e)

simulate(finish_time, verbose=False, reinitialize=True, save_choices=False)

Simulates behavior of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

Source code in ml4cps/cps/base.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def simulate(self, finish_time, verbose=False, reinitialize=True, save_choices=False):
    """Simulates behavior of the system until the finish_time is reached.

    Parameters
    ----------
    finish_time : Time when simulation finishes.
    verbose : Should the execution log be printed in detail (default is False).
    save_choices : Should the choices for each component be saved to json files after
    the simulation is finished (default is False).
    """
    self.reinitialize(0)

    env = simpy.Environment()
    finish_time = float(finish_time)

    if verbose:
        print('Simulation started: ')

    self.simulation_process_simpy(env, finish_time, verbose)
    env.run(until=finish_time)

    discr_state_data = {}
    discr_output_data = {}
    cont_state_data = {}
    cont_output_data = {}

    try:
        if len(self._xt) > 0 and len(self._xt) > 0 and finish_time > self._t:  # there is time-continuous state variable
            self._step_continuous(self._t, finish_time)
    except Exception as ex:
        print_exc()
        raise Exception('Exception during self._step_continuous in state {}'.format(self._q))

    try:
        self.finish(env.now)

        discr_state_data = pd.DataFrame(self._discrete_state_data).set_index("Time")
        discr_output_data = tools.data_list_to_dataframe(None, self._discrete_output_data,
                                                               self.discrete_output_names, 'd')
        try:
            cont_state_data = tools.data_list_to_dataframe(self.id, self._continuous_state_data,
                                                           self.cont_state_names, 'x')
        except Exception as ex:
            print_exc()
        cont_output_data = tools.data_list_to_dataframe(self.id, self._continuous_output_data,
                                                                 self.cont_output_names, 'y')
    except Exception as ex:
        print_exc()
        warnings.warn('Simulation failed.')

    if verbose:
        print('Simulation finished.')
    return discr_state_data, discr_output_data, cont_state_data, cont_output_data, env.now

simulation_process_simpy(env, max_time, verbose=False)

The function simulates single concurrent thread of one component (CPSComponent). Parameters


env : simpy.Environment It is simpy environment used to synchronize multiple parallel threads during simulation. max_time : float It is the maximum time that the simulation will perform. verbose : bool Should the function print execution logs (default is False).

Source code in ml4cps/cps/base.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def simulation_process_simpy(self, env, max_time, verbose=False):
    """
        The function simulates single concurrent thread of one component (CPSComponent).
        Parameters
        ----------
        env : simpy.Environment
            It is simpy environment used to synchronize multiple parallel threads during simulation.
        max_time : float
            It is the maximum time that the simulation will perform.
        verbose : bool
            Should the function print execution logs (default is False).
    """
    env.process(self.discrete_dynamics_process(env, max_time, verbose=verbose))
    env.process(self.continuous_dynamics_process(env, max_time, verbose=verbose))
    env.process(self.discrete_time_dynamics_process(env, max_time, verbose=verbose))

timed_event(q, xc, xd)

Calculates if and when the next time event will happen and the new state values. :param t: Current time. :param q: Discrete-event part of the state. :param xc: Time-continuous part of the state. :param xd: Time-discrete part of the state. :return: Time delay of the timed event, new state value.

Source code in ml4cps/cps/base.py
703
704
705
706
707
708
709
710
711
712
def timed_event(self, q, xc, xd):
    """
    Calculates if and when the next time event will happen and the new state values.
    :param t: Current time.
    :param q: Discrete-event part of the state.
    :param xc: Time-continuous part of the state.
    :param xd: Time-discrete part of the state.
    :return: Time delay of the timed event, new state value.
    """
    return None, None

automata

ml4cps.automata.learn

The module provides learning algorithms for creation of different kinds of automata.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

FnDetectChangePoints(xout, udout, xout_shifts)

Detects change points in output and input variables, filters them, and constructs a trace structure. This function analyzes the provided output (xout) and input (udout) data to detect change points using the findChangePoints function. It processes both output and input variables, aggregates and filters the detected change points, and returns a structured trace dictionary containing the processed data and change point information. Args: xout (np.ndarray): Output data array of shape (n_samples, n_outputs). udout (np.ndarray): Input data array of shape (n_samples, n_inputs). xout_shifts (np.ndarray): Shifted output data array, used for further processing. Returns: dict: A dictionary with the following keys: - 'x': Filtered output data array. - 'xs': Filtered shifted output data array. - 'chpoints': Array of global change points detected across all variables. - 'chpoints_per_var': List of arrays, each containing change points for a specific variable. - 'ud': Filtered input data array. - 'labels_num': Empty list (reserved for numeric labels). - 'labels_trace': Empty list (reserved for trace labels). Notes: - Relies on global variables: num_var, num_ud, max_deriv, and chp_depths. - Uses external functions: findChangePoints and filterChangePoints. - The function is intended for use in time-series or sequential data analysis where detecting significant changes in variable values is required.

FnShiftAndDiff(xout, udout, norm_coeff, num_var, num_ud, max_deriv, Ts)

Normalizes and processes state and input data by generating shifted versions and computing derivatives. This function performs the following operations: 1. Normalizes the state output xout using the provided normalization coefficients. 2. Generates shifted duplicates of xout up to the specified maximum derivative order. 3. Computes numerical derivatives of xout up to max_deriv order and appends them to the state data. 4. Strips the initial entries from xout and the shifted data to account for the derivative computation. 5. Normalizes the input data udout (if present) and strips initial entries to match the processed state data. Args: xout (np.ndarray): State output data of shape (n_samples, num_var). udout (np.ndarray): Input data of shape (n_samples, num_ud). norm_coeff (np.ndarray): Normalization coefficients of shape (num_var + num_ud, 1). num_var (int): Number of state variables. num_ud (int): Number of input variables. max_deriv (int): Maximum order of derivatives to compute. Ts (float): Sampling time interval. Returns: Tuple[np.ndarray, np.ndarray, pd.DataFrame]: - xout (np.ndarray): Processed and augmented state data with derivatives, shape (n_samples - max_deriv, ...). - udout (np.ndarray): Normalized input data, shape (n_samples - max_deriv, num_ud). - xout_shifts (pd.DataFrame): Shifted duplicates of the normalized state data, shape (n_samples - max_deriv, ...). Notes: - The function assumes that xout and udout are NumPy arrays and that xout has at least max_deriv rows. - The normalization coefficients should be provided for both state and input variables. - The function uses zero-padding for shifted and derivative computations.

FnTraceToTrainingData(trace, num_var, num_ud, useTime)

Converts a trace dictionary into training data suitable for machine learning models. Parameters: trace (dict): A dictionary containing the following keys: - 'x' (np.ndarray): Array of system variables over time (shape: [timesteps, num_var]). - 'ud' (np.ndarray): Array of user-defined variables over time (shape: [timesteps, num_ud]). - 'labels_trace' (np.ndarray): Array of state labels for each segment. - 'chpoints' (list or np.ndarray): Indices indicating change points (state switches) in the trace. num_var (int): Number of system variables to include in the feature vector. num_ud (int): Number of user-defined variables to include in the feature vector. useTime (bool): If True, includes the time since the last state switch as a feature. Returns: X (np.ndarray): Feature matrix where each row corresponds to a time step and contains: - Current state label - System variables - User-defined variables (if num_ud > 0) - Time since last state switch (if useTime is True) Y (np.ndarray): Array of next state labels (class labels) for each feature vector in X. states (np.ndarray): Array of state labels for each time step in the trace. Notes: - The function skips the last time step in the trace for feature construction, as it cannot form a (X, Y) pair. - The function assumes that the trace data is properly aligned and that 'chpoints' and 'labels_trace' are consistent.

build_pta(data, event_col='event', boundaries=1)

Builds a Prefix Tree Acceptor (PTA) from a collection of event sequences. This function constructs a PTA by iterating through each sequence of events in the provided data. It adds states and transitions to the automaton based on the observed event sequences, and sets the depth, in-degree, and out-degree of the states. The PTA is useful for learning automata from positive examples. Args: data (iterable): An iterable of event sequences. Each sequence can be a pandas DataFrame, pandas Series, or string. If a DataFrame, it should contain at least a time column and an event column. event_col (str, optional): The name of the column containing event labels in the input DataFrame or Series. Defaults to 'event'. boundaries (int or dict, optional): Not currently used in the function, but intended for handling event boundaries or timing constraints. Defaults to 1. Returns: Automaton: The constructed Prefix Tree Acceptor representing the input event sequences. Notes: - The function expects the presence of an Automaton class with methods for adding states, transitions, and final states. - If a sequence is a string, it is converted to a pandas Series of characters. - Timing information (dt) is calculated as the difference between consecutive time steps. - The function skips empty sequences.

computeDistance(der)

Computes a distance metric over a sliding window for a given derivative array. For each position in the input array der, the function calculates the sum of absolute differences between two windows of size windowSize before and after the current position, after normalizing each window by subtracting its first element. The result is an array of distances. Parameters: der (np.ndarray): The input array (e.g., derivative values) over which to compute the distance. Returns: np.ndarray: An array containing the computed distances for each valid position.

filterChangePoints(xout, udout, xout_shifts, chpoints, chp_var)

Filters and synchronizes detected changepoints in time series data across multiple variables. This function processes global and local changepoint indices to ensure consistency and remove redundant or closely spaced changepoints. It updates the provided data arrays and changepoint lists accordingly. Args: xout (np.ndarray): Output variable time series data (samples x variables). udout (np.ndarray): Input variable time series data (samples x variables), or an empty array if not used. xout_shifts (np.ndarray): Shifted output variable data (samples x variables). chpoints (list or np.ndarray): List of global changepoint indices. chp_var (list of np.ndarray): List containing arrays of changepoint indices for each variable. Returns: tuple: - xout (np.ndarray): Filtered output variable data. - udout (np.ndarray): Filtered input variable data. - xout_shifts (np.ndarray): Filtered shifted output variable data. - chpoints (np.ndarray): Filtered and synchronized global changepoint indices. - chp_var (list of np.ndarray): Updated list of changepoint indices for each variable. Notes: - Uses global variables: windowSize, num_var, num_ud. - Assumes that changepoints are sorted and that there are at least two changepoints. - The function modifies chp_var in place.

filterindx(indx, windw)

Filters out indices from the input array that are within a specified window of each other. Given a sorted array of indices, this function removes any index that is within windw distance from its predecessor, keeping only the first occurrence in each window. Parameters: indx (array-like): A sorted array or list of integer indices. windw (int): The minimum allowed distance between consecutive indices. Returns: numpy.ndarray: The filtered array of indices, where no two indices are within windw of each other. Example: >>> filterindx(np.array([1, 2, 3, 10, 12]), 2) array([ 1, 10, 12])

findChangePoints(xout, depth, starting, ending, max_depth)

Recursively detects change points in a multi-dimensional signal using a hierarchical approach. This function analyzes a segment of the input array xout at a given depth (dimension), computes a distance metric to identify potential change points (peaks), and then recursively searches for further change points in subsegments at deeper levels. The recursion stops when the maximum depth is reached or the segment is too small. Parameters


xout : np.ndarray The input array containing the signal or features to analyze. Expected shape is (n_samples, n_features). depth : int The current depth (dimension) being analyzed. starting : int The starting index of the segment to analyze. ending : int The ending index (exclusive) of the segment to analyze. max_depth : int The maximum depth (dimension) to analyze. Returns


np.ndarray An array of indices representing detected change points within the specified segment. Notes


  • Uses global variables windowSize and chp_depths for windowing and tracking change points per depth.
  • Utilizes computeDistance, find_peaks, and filterindx helper functions.
  • At the top level (depth == 0), prepends and appends boundary indices to the result.

rpni(positive_samples, negative_samples)

Implements the RPNI (Regular Positive and Negative Inference) algorithm for learning a DFA from positive and negative samples. Args: positive_samples (Iterable[str]): A collection of strings that should be accepted by the learned DFA. negative_samples (Iterable[str]): A collection of strings that should be rejected by the learned DFA. Returns: DFA: A deterministic finite automaton (DFA) that accepts all positive samples and rejects all negative samples. Notes: - The function first constructs a Prefix Tree Acceptor (PTA) from the positive samples. - It then attempts to merge states in the DFA, ensuring that no negative sample is accepted after each merge. - The merging process is guided by the constraint that all negative samples must be rejected.

simple_learn_from_event_logs(data, initial=True, count_repetition=True, verbose=False)

Simple algorithm to learn a timed automaton from event log data. This function constructs a timed automaton by iterating over sequences of timestamped events. Each event sequence is treated as a trace, and transitions are created between states based on event occurrences and their timing. States are determined by the emitted events, optionally including repetition counts. The automaton can be initialized with an explicit initial state, and transitions can account for repeated events. Args: data (list or pandas.Series): A list of event sequences, where each sequence is a pandas Series with timestamps as indices and event labels as values. initial (bool, optional): If True, adds an explicit 'initial' state to the automaton. Defaults to True. count_repetition (bool, optional): If True, distinguishes states and transitions by counting consecutive repetitions of the same event. Defaults to True. verbose (bool, optional): If True, prints detailed information about the learning process. Defaults to False. Returns: Automaton: The learned timed automaton object. Notes: - Each sequence in data should be a pandas Series indexed by timestamps. - If a sequence contains fewer than two events, it is skipped. - The function assumes the existence of an Automaton class with add_initial_state and add_single_transition methods.

simple_learn_from_signal_updates(data, sig_names, initial=True, verbose=False)

Learns a timed automaton from sequences of signal updates. This function processes a list of dataframes, each representing a sequence of signal updates over time. For each sequence, it constructs states based on the values of the specified signals and adds transitions to an Automaton object whenever a signal value changes. Args: data (list of pandas.DataFrame): List of dataframes, each containing time-stamped signal updates. The first column is assumed to be the time column, and subsequent columns correspond to signal names. sig_names (list of str): List of signal names to track and use for state construction. initial (bool, optional): If True, adds an initial state to the automaton for each sequence. Defaults to True. verbose (bool, optional): If True, prints detailed information about the learning process. Defaults to False. Returns: Automaton: The learned automaton with states and transitions based on the observed signal updates. Notes: - Each state is represented as a dictionary mapping signal names to their current values. - Transitions are added only when all signal values are set (i.e., not None). - The event label for each transition is formatted as '<-'. - The transition is annotated with the time difference (delta_t) between consecutive events.

simple_learn_from_signal_vectors(data, config=None, drop_no_changes=True, verbose=False)

Learns a timed automaton from a list of signal vector dataframes. This function processes sequences of signal vectors (as pandas DataFrames), detects changes in the specified signal columns, and constructs a timed automaton by adding transitions for each detected event. Args: data (list of pandas.DataFrame): List of DataFrames, each representing a sequence of signal vectors. The first column is assumed to be the time column, and the remaining columns are signal values. sig_names (list of str): List of column names in the DataFrame that correspond to the signals to be considered for state transitions. drop_no_changes (bool, optional): If True, rows where no signal changes occur are dropped before processing. Default is False. verbose (bool, optional): If True, prints detailed information about the learning process. Default is False. Returns: Automaton: An Automaton object constructed from the observed transitions in the input data. Notes: - Each transition in the automaton corresponds to a change in the signal vector, with the event label representing the difference between consecutive signal vectors and the transition time as the time delta. - The function assumes that the Automaton class and its add_single_transition method are defined elsewhere.

ml4cps.automata.base

The module provides a class Automaton which inherits CPSComponent and implements the dynamics of different kinds of automata.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

Automaton

Bases: CPSComponent

Automaton class is the main class for modeling various kinds of hybrid systems.

num_modes property

Returns the number of modes in the automaton. :return: number of states.

num_transitions property

Returns the number of transitions in the automaton. :return: number of transitions.

state property writable

Automata discrete state is uni-variate. :return:

__init__(states=None, transitions=None, configuration=None, unknown_state='raise', id='', initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(), **kwargs)

Class initialization from lists of elements. :param states: Discrete states / modes of continuous behavior. :param events: The events that trigger state transitions. :param transitions: The transition information. If a collection of dicts then dict should contain "source", "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection of tuples can be used of the form (source, event, dest, *). :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.

__str__()

String representation of the automaton. :return:

add_final_state(states)

Add final state(s) of the automaton. :param states: States to add.

add_initial_state(states)

Add initial state(s) of the automaton. :param states: States to add.

add_state(new_state, enabled=None, **kwargs)

Add state to the automaton. :param new_state: State to be added.

add_state_data(s, d)

Add state data to a state s the automaton. :param s: state :param d: data to be added to s :return:

add_states_from(new_state, enabled=None, **kwargs)

Add multiple states to the automaton. :param new_state: States to be added.

add_transition(s, d, e, enabled=None, guard=None, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

add_transitions_from(list_of_tuples, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

flow(q, p, x, u)

Flow equation gives derivative of the continuous variables. :param q: Current discrete state of the model. :param p: Stochastic parameters generated on entry to state current_q. :param x: Current continuous state of the model. :param u: Calculated internal i signals. :return: Derivative of the continuous state x.

get_num_in(q)

Returns the number of in transitions of state q in the automaton. :return: number of transitions.

get_num_out(q)

Returns the number of out transitions of state q in the automaton. :return: number of transitions.

get_transition(s, d=None, e=None, if_more_than_one='raise')

Get all transitions with source state s, destination state __d. In case when e is provided, the returned list contains transitions where event is e. :param if_more_than_one: :param s: Source state. :param d: Destination state. :param e: Event. :return:

in_transitions(s, event=None)

Get all incoming transitions of state s. :param s: :return:

inv(t, q, x, y, z, p)

Invariants. :param t: :param q: :param x: :param y: :param z: :param p:

is_transition(s, d, e)

Check if a transition (s,d,e) exists in the automaton. :param s: Source. :param d: Destination. :param e: Event. :return:

merge(q1, q2)

If two states are compatible, they are merged with the function merge. The transitions of the automaton, the in- and outdegree of the states and the number of transitions happening are adjusted.

out_transitions(s, event=None)

Get all outgoing transitions of state s. :param event: :param s: :return:

print_state(v)

Prints outgoing transitions of a state v.

Parameters:

Name Type Description Default
v state
required

Returns:

Name Type Description
String

Description of the outgoing transitions of the state.

remove_transition(source, dest)

Remove the transition(s) from source to dest. :param source: :param dest: :return:

rename_events(prefix='e_')

Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.

step(q, x0, t, u)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

try_merge_states(state1, state2, try_fun=None)

Merge state2 into state1 and update transitions.

cps

ml4cps.cps.base

The module provides base classes to represent the dynamics of cyber-physical systems (CPS): - CPS, and, - CPSComponent.

Author: Nemanja Hranisavljevic, hranisan@hsu-hh.de

CPS

CPS class represents a cyber-physical system. It is used to model the hierarchy in the system as each CPS can contain a mixture of other CPS and CPS components. The leaves of the hierarchy are only CPS component objects which define their dynamics and communication with other components.

Attributes:

Name Type Description
id str

Identifier of the object.

com OrderedDict

Property. A collection of components.

parent_system property writable

Gets the parent system by accessing _parent_system private attribute.

Returns:

Type Description
CPS

The parent system.

__getitem__(key)

Gets the component with the given key in the collection.

Parameters:

Name Type Description Default
key string

The key where the value will be stored. Must be a hashable type (e.g., int, str).

required

Return: (CPS, CPSComponent): Returned component or subsystem.

__init__(sys_id, components)

Initializes CPS with the given attributes.

Parameters:

Name Type Description Default
sys_id str

ID of the system.

required
components iterable

Child components of the system.

required

get_all_components(exclude=None)

Returns a list of all child components (not only direct) except those with ids possibly provided in exclude.

Parameters

exclude : iterable A list of component ids to exclude in the returned list.

get_components(exclude=None)

Returns a list of all direct child components except those with ids possibly provided in exclude.

Parameters

exclude : iterable A list of component ids to exclude in the returned list (default is None).

reinitialize(t=0, state=None)

The function re-initializes the CPS components of this CPS with the given state values. :param t: Current time to set to the components. :param state: State of the CPS (it's components) given as a dictionary of dictionaries ... of tuples (according to the CPS hierarchy). The tuplus are created as values concatenated values of discrete-event state variables, time-continuous state variables and time-discrete state variables.

:return:

set_child_component(id, com)

Set component with the id.

Parameters

id : str ID of the component to add. com : (CPS, CPSComponent) Component of subsystem to add.

simulate(finish_time, verbose=False, save_choices=False)

Simulates behaviour of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

CPSComponent

Bases: PythonModel, Simulator

General hybrid system class based on scipy and simpy.

state property writable

System state is a vector of all state variables. :return:

__step_continuous(clock_start, clock_finish)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

apply_sim_event(e, env=None)

The event e is applied in this component's simpy execution, this means that the process must wait for an event. :param e: Event to apply. :return:

simulate(finish_time, verbose=False, reinitialize=True, save_choices=False)

Simulates behavior of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

simulation_process_simpy(env, max_time, verbose=False)

The function simulates single concurrent thread of one component (CPSComponent). Parameters


env : simpy.Environment It is simpy environment used to synchronize multiple parallel threads during simulation. max_time : float It is the maximum time that the simulation will perform. verbose : bool Should the function print execution logs (default is False).

timed_event(q, xc, xd)

Calculates if and when the next time event will happen and the new state values. :param t: Current time. :param q: Discrete-event part of the state. :param xc: Time-continuous part of the state. :param xd: Time-discrete part of the state. :return: Time delay of the timed event, new state value.

ml4cps.cps.sim

Simulation.

Author: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com

Simulator

stop_condition(t)

Simulation stop condition which can be overridden by the subclasses.

discretization

ml4cps.discretization.discretization

The module provides base classes to represent the dynamics of cyber-physical systems (CPS): CPS and CPSComponent.

Author: Nemanja Hranisavljevic, hranisan@hsu-hh.de

EqualFrequencyDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements the equal-frequency interval (EFI) discretization method. It divides each variable (column) into intervals such that each interval contains approximately the same number of data points.

discretize(df, return_str=False, append_discr=None)

Discretize data into equal-frequency intervals. :param df: Data to discretize. :param return_str: Whether to return discretized data as a concatenated string per row. :return: Discretized data.

train(data, number_of_intervals=10)

Estimate model parameters, thresholds that divide each variable into equal-frequency intervals. :param data: Data to calculate model parameters from. :param number_of_intervals: Number of equal-frequency intervals per variable. :return:

EqualWidthDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements equal-width interval (EWI) discretization method. It calculates range of every variable (column) of the input data into a predefined number of equal-width intervals.

discretize(df, return_str=False, append_discr=None)

Discretize data into equal width intervals. :param data: Data to discretize. :return: Discretized data.

train(data, number_of_intervals=10)

Estimate model parameters, thresholds that divide each variable into equal-width intervals. :param data: Data to calculate model parameters from. :param number_of_intervals: Number of equal-width intervals per variable. :return:

KMeansDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements K-means discretization. It clusters the data values of each variable into a predefined number of clusters using the K-means algorithm.

discretize(df, return_str=False, append_discr=None)

Discretize data into clusters determined by K-means. :param df: DataFrame to discretize. :param return_str: Whether to return discretized data as concatenated strings. :return: Discretized data.

train(data, number_of_clusters_per_var=10)

Train the K-means discretizer by fitting K-means models for each variable (column) in the data. :param data: List of DataFrames to calculate model parameters from. :param number_of_clusters_per_var: Number of clusters (intervals) per variable.

MultivariateKMeansDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements multivariate K-means discretization. It clusters the data based on all variables (columns) together into a predefined number of clusters.

discretize(df, return_str=False, append_discr=None)

Discretize data into clusters determined by K-means. :param df: DataFrame to discretize. :param return_str: Whether to return discretized data as concatenated strings. :return: Discretized data (cluster labels).

train(data, number_of_clusters=10)

Train the K-means discretizer by fitting a single K-means model for all variables. :param data: List of DataFrames to calculate model parameters from. :param number_of_clusters: Number of clusters.

TimeSeriesDiscretizer

Abstract class that encapsulates methods used for the discretization of time series.

vis

ml4cps.vis

The module provides methods to visualize various kinds of data, such as time series or automata graphs.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

add_time_frames_to_subplots(fig, trace_indices=None, frame_duration_ms=80, transition_duration_ms=0, redraw=False, keep_tail=True, sort_each_trace_by_x=True, frame_stride=1, slider_prefix='Time: ', title_prefix=None)

Add animation frames to a subplot figure where each subplot has one scatter trace and x represents time.

Parameters

fig : go.Figure A subplot figure. trace_indices : sequence[int] | None Which traces to animate. If None, animate all traces in fig.data. frame_duration_ms : int Duration of each frame in milliseconds. transition_duration_ms : int Duration of frame transition in milliseconds. redraw : bool Passed to Plotly animation args. keep_tail : bool If True, each frame shows all points up to time t. If False, each frame shows only the current point for each trace. sort_each_trace_by_x : bool If True, sort each trace by its own x values before animating. frame_stride : int Use every n-th point to reduce number of frames. slider_prefix : str Prefix shown before current slider value. title_prefix : str | None Optional title prefix, e.g. "t = ".

Returns

go.Figure The same figure, updated in place with frames and controls.

export_plotly_frames_animation_cumulative(fig, output_path, fps=5, width=1200, height=800, scale=2, cleanup_frames=True, frame_dir=None)

Export a Plotly animated figure by cumulatively applying go.Frame updates. This better matches Plotly's animation behavior for partial frame updates.

plot2d(df, x=None, y=None, mode='markers', hovercolumns=None, figure=False, **args)

Creates a 2D scatter or line plot using Plotly based on the provided DataFrame columns. Parameters: df (pd.DataFrame): The input DataFrame containing the data to plot. x (str, optional): The column name to use for the x-axis. y (str, optional): The column name to use for the y-axis. mode (str, optional): The Plotly scatter mode (e.g., 'markers', 'lines'). Defaults to 'markers'. hovercolumns (list of str, optional): List of column names to include in the hover tooltip. figure (bool, optional): If True, returns a Plotly Figure object; otherwise, returns a Scatter trace. Defaults to False. **args: Additional keyword arguments passed to the Plotly Scatter constructor. Returns: plotly.graph_objs._scatter.Scatter or plotly.graph_objs._figure.Figure: The generated Plotly Scatter trace or Figure, depending on the 'figure' parameter. Example: plot2d(df, x='feature1', y='feature2', hovercolumns=['label'], mode='markers', figure=True)

plot3d(df, x=None, y=None, z=None, mode='markers', hovercolumns=None, **args)

Creates a 3D scatter plot using Plotly's Scatter3d, with customizable axes, hover information, and additional plot arguments. Parameters: df (pandas.DataFrame): The data source containing columns for x, y, z, and optional hover data. x (str, optional): The column name in df to use for the x-axis. y (str, optional): The column name in df to use for the y-axis. z (str, optional): The column name in df to use for the z-axis. mode (str, optional): Plotly scatter mode (e.g., 'markers', 'lines'). Defaults to 'markers'. hovercolumns (list of str, optional): List of column names in df to include in the hover tooltip. **args: Additional keyword arguments passed to go.Scatter3d. Returns: plotly.graph_objs._scatter3d.Scatter3d: A Plotly 3D scatter plot object configured with the specified data and options.

plot_2d_contour_from_fun(fun, rangex=None, rangey=None, th=50, **kwargs)

Plots a 2D contour of a function over a specified range. Parameters: fun (callable): A function that takes a 2D array of shape (n_points, 2) and returns a 1D array of function values. rangex (tuple, optional): The range for the x-axis as (min, max). Defaults to (-5, 5) if not provided. rangey (tuple, optional): The range for the y-axis as (min, max). Defaults to (-5, 5) if not provided. th (int, optional): Unused parameter, kept for compatibility. Defaults to 50. **kwargs: Additional keyword arguments passed to the plotly.graph_objs.Contour constructor. Returns: plotly.graph_objs.Contour: A Plotly contour plot object representing the function values over the specified range.

plot_cps(cps, dash_id=None, node_labels=False, edge_labels=True, node_size=40, node_font_size=20, edge_font_size=16, edge_text_max_width=None, output='cyto', dash_port=8050, height='100%', minZoom=0.5, maxZoom=2, **kwargs)

Plots all the components of a CPS in the same figure. :param cps: CPS to plot. :param node_labels: Should node labels be plotted. :param edge_labels: Should edge labels be plotted. :param node_size: What is the size of the nodes in the figure. :param node_font_size: The font size of the node labels. :param edge_font_size: The font size of the edge labels. :param edge_text_max_width: Max width of the edge labels. :param output: Should output be plotted as a dash.Cytoscape component ("cyto"), or should dash server be run ("dash"). :param dash_port: If temporary dash server is run, what port to use. :param kwargs: Other paramters are forwarded to the Cytoscape component. :return:

plot_cps_component(cps, id=None, node_labels=False, center_node_labels=False, event_label=True, show_transition_freq=False, show_transition_timing=False, font_size=6, edge_font_size=6, edge_text_max_width=None, init_label=False, limit_interval_precision=2, show_transition_data=False, transition_data_keys=True, node_size=20, output='cyto', dash_port=8050, min_zoom=0.5, split_edges_diff_event=False, max_zoom=2, min_edge_thickness=0.1, max_edge_thickness=4, freq_as_edge_thickness=False, color='black', title_text=None, layout_name='breadthfirst', layout_spacingFactor=1, hide_nodes=None)

Visualizes a component of a Cyber-Physical System (CPS) as a graph using Dash Cytoscape.
This function generates a graphical representation of the discrete states and transitions of a CPS,
with various customization options for node and edge appearance, labels, and output format.
The visualization can be rendered as Dash Cytoscape elements, in a Dash app, or as a notebook widget.
Parameters:
    cps: object
        The CPS object containing discrete states, transitions, and related data.
    id: str, optional
        The unique identifier for the Cytoscape component (default: "graph").
    node_labels: bool, optional
        Whether to display labels on nodes (default: False).
    center_node_labels: bool, optional
        Whether to center node labels (default: False).
    edge_labels: bool, optional
        Whether to display labels on edges (default: True).
    show_transition_freq: bool, optional
        Whether to show transition frequency on edge labels (default: False).
    edge_font_size: int, optional
        Font size for edge labels (default: 6).
    edge_text_max_width: int or None, optional
        Maximum width for edge label text wrapping (default: None).
    init_label: bool, optional
        Whether to label initial state transitions as 'init' (default: False).
    show_transition_data: bool or list, optional
        Whether to display additional transition data on edge labels. If a list, only specified keys are shown (default: False).
    node_size: int, optional
        Size of the nodes (default: 20).
    output: str, optional
        Output format: "cyto" (Dash Cytoscape Div), "elements" (raw elements), "notebook" (inline Dash app), or "dash" (Dash app in browser) (default: "cyto").
    dash_port: int, optional
        Port for running the Dash app (default: 8050).
    min_zoom: float, optional
        Minimum zoom level for the Cytoscape component (default: 0.5).
    max_zoom: float, optional
        Maximum zoom level for the Cytoscape component (default: 1).
    min_edge_thickness: float, optional
        Minimum edge thickness for frequency-based scaling (default: 0.1).
    max_edge_thickness: float, optional
        Maximum edge thickness for frequency-based scaling (default: 4).
    freq_as_edge_thickness: bool, optional
        Whether to scale edge thickness based on transition frequency (default: False).
    color: str, optional
        Color for nodes and edges (default: "black"). If "hsu", uses a preset color.
    title_text: str or Dash component, optional
        Title text or component to display above the graph (default: None).
Returns:
    Dash component, dict, or Dash app:
        - If output == "cyto": returns a Dash html.Div containing the Cytoscape graph.
        - If output == "elements": returns a dict with 'nodes' and 'edges'.
        - If output == "notebook": runs and displays a Dash app inline (for Jupyter).
        - If output == "dash": runs a Dash app in the browser and returns the app instance.
Notes:
    - Requires Dash, dash_cytoscape, dash_bootstrap_components, and pandas.
    - The function supports interactive modals for displaying timing data on states and transitions.
    - Threading is used to launch the Dash app in browser mode without blocking the main program.
# 1. 'grid'            → Places nodes in a simple rectangular grid.

2. 'random' → Randomly positions nodes; useful for testing.

3. 'circle' → Arranges nodes evenly around a circle.

4. 'concentric' → Places nodes in concentric circles, often by degree or weight.

5. 'breadthfirst' → Hierarchical layout (tree-like), good for state machines or DAGs.

Optional params: directed=True, padding=

6. 'cose' → Force-directed layout (spring simulation). Great for organic graphs.

Optional params: idealEdgeLength, nodeRepulsion, gravity, numIter

7. 'cose-bilkent' → Improved force-directed layout with better stability and aesthetics.

(Requires: cyto.load_extra_layouts())

8. 'cola' → Constraint-based force-directed layout; handles larger graphs well.

(Requires: cyto.load_extra_layouts())

9. 'euler' → Physically simulated layout; looks natural and dynamic.

(Requires: cyto.load_extra_layouts())

10. 'avsdf' → Circular layout optimized to reduce edge crossings.

(Requires: cyto.load_extra_layouts())

11. 'spread' → Distributes disconnected components evenly across space.

(Requires: cyto.load_extra_layouts())

12. 'klay' → Layered (hierarchical) layout, excellent for flowcharts or process models.

(Requires: cyto.load_extra_layouts())

13. 'dagre' → Directed acyclic graph layout, ideal for workflows and automata.

(Requires: cyto.load_extra_layouts())

Optional params: rankDir='TB' (top-bottom), 'LR' (left-right), etc.

plot_cps_plotly(cps, layout='kamada_kawai', marker_size=20, node_positions=None, show_events=True, show_num_occur=False, show_state_label=True, font_size=10, plot_self_transitions=True, use_previos_node_positions=False, **kwargs)

Visualizes a Cyber-Physical System (CPS) state-transition graph using Plotly. This function generates an interactive Plotly figure representing the states and transitions of a CPS. Nodes represent system states, and edges represent transitions. Various layout algorithms and display options are supported. Args: cps: The CPS object containing the state-transition graph. Must have attributes _G (networkx graph), get_transitions(), print_state(), num_occur(), and previous_node_positions. layout (str, optional): Layout algorithm for node positioning. Options are "dot" (default), "spectral", "kamada_kawai", or "fruchterman_reingold". marker_size (int, optional): Size of the node markers. Default is 20. node_positions (dict, optional): Precomputed node positions as a dictionary {node: (x, y)}. If None, positions are computed using the selected layout. show_events (bool, optional): Whether to display event labels on transitions. Default is True. show_num_occur (bool, optional): Whether to display the number of occurrences for each transition. Default is False. show_state_label (bool, optional): Whether to display state labels on nodes. Default is True. font_size (int, optional): Font size for transition/event labels. Default is 10. plot_self_transitions (bool, optional): Whether to plot self-loop transitions. Default is True. use_previos_node_positions (bool, optional): If True and node_positions is None, reuse positions from cps.previous_node_positions. Default is False. **kwargs: Additional keyword arguments passed to the layout function (e.g., for networkx layouts). Returns: plotly.graph_objs.Figure: A Plotly figure object representing the CPS state-transition graph. Notes: - Requires Plotly, NetworkX, and pydotplus (for "dot" layout). - The CPS object must provide the required methods and attributes as described above. - Edge and node styling can be further customized by modifying the function.

plot_dash_frames(graph_frames, dash_port=8050)

Launches an interactive Dash web application to visualize a sequence of graph frames with a slider for manual frame selection. Args: graph_frames (list): A list of Dash components (e.g., Cytoscape graphs) representing different frames to display. dash_port (int, optional): The port number on which to run the Dash server. Defaults to 8050. Returns: dash.Dash: The Dash application instance. Side Effects: - Starts a Dash server in a separate thread. - Opens the default web browser to display the Dash app. - Waits for user input before returning. Notes: - The app displays the first frame by default and allows users to select other frames using a slider. - The function blocks until the user presses Enter in the console.

plot_execution_tree(graph, nodes_to_color, color, font_size=30)

Plots a system execution tree as a graph, where the horizontal position of nodes corresponds to their timestamps and the tree branches vertically. Args: graph (networkx.DiGraph): A directed graph where each node represents a system state, and edges represent transitions. Each node should have a 'label' (str) and 'weight' (int) attribute. Node names must be timestamp strings in the format "%d/%m/%Y, %H:%M:%S". nodes_to_color (list): List of node identifiers (timestamp strings) to be highlighted with a specific color. color (str): The color to use for highlighting nodes in nodes_to_color. font_size (int, optional): Font size for node labels in the visualization. Defaults to 30. Returns: cyto.Cytoscape: A Dash Cytoscape object representing the execution tree visualization, with nodes positioned by timestamp and colored as specified. Notes: - The function assumes the first node in graph.nodes is the starting node. - Node positions are determined by the time difference from the start node (x-axis) and their 'weight' attribute (y-axis). - Nodes in nodes_to_color are colored with the specified color; all others are gray. - Requires the cyto (Dash Cytoscape) library and datetime module.

plot_state_transitions(ta, state, obs=None)

Visualizes the outgoing state transitions from a given state in a timed automaton, along with associated observation data. Parameters: ta: An object representing the timed automaton, expected to have an out_transitions(state) method that returns transitions from the given state. state: The current state for which outgoing transitions and associated observations are to be visualized. obs (optional): A pandas DataFrame containing observation data. Must include at least the columns 'Mode', 'q_next', 'Duration', 'Time', and optionally 'Vergussgruppe', 'HID', 'ChipID', 'Order', and 'ArtNr'. If None, the function raises NotImplemented. Returns: fig: A Plotly figure object containing subplots for each outgoing transition. For each transition, the function displays: - A scatter plot of observation durations over time, grouped by 'Vergussgruppe'. - A histogram of durations for each 'Vergussgruppe'. The subplots are arranged with shared axes and appropriate titles for each transition. Raises: NotImplemented: If obs is None. Notes: - The function expects certain columns to exist in the obs DataFrame. If missing, default values are assigned. - Colors for different 'Vergussgruppe' groups are assigned from DEFAULT_PLOTLY_COLORS. - The function uses Plotly's make_subplots, go.Scatter, and go.Histogram for visualization.

plot_stateflow(stateflow, color_mapping=None, state_col='State', task_col='Task', bar_height=12, start_column='Start', finish_column='Finish', return_figure=False, description_col='Description', idle_states=None)

Visualizes state transitions over time for one or more tasks/stations as a Gantt-like interactive timeline.

Parameters: - stateflow (DataFrame or dict): DataFrame with state transitions, or a dictionary of DataFrames per station. - color_mapping (dict, optional): Mapping of state names to colors. If None, default colors are used. - state_col (str): Column name indicating the state (default: 'State'). - bar_height (int): Height of the timeline bars (default: 12). - start_column (str): Column name with start timestamps (default: 'Start'). - finish_column (str): Column name with end timestamps (default: 'Finish'). - return_figure (bool): If True, returns a Plotly Figure. Otherwise, returns a list of Plotly traces. - description_col (str or list): Column(s) to include in the hover tooltip (default: 'Description'). - idle_states (str or list): State(s) to exclude from the plot (e.g., 'IDLE').

Returns: - Plotly Figure or list of traces, depending on return_figure.

Example

fig = plot_stateflow(df, state_col='Mode', start_column='StartTime', finish_column='EndTime', return_figure=True) fig.show()

This function is ideal for visualizing process flows, machine states, or event-based logs with time intervals.

plot_timeseries(data, timestamp=None, mode_data=None, discrete=False, title=None, use_columns=None, height=None, limit_num_points=None, names=None, xaxis_title=None, customdata=None, iterate_colors=True, y_title_font_size=14, opacity=1.0, vertical_spacing=0.005, sharey=False, bounds=None, plot_only_changes=False, yAxisLabelOffset=False, marker_size=4, showlegend=False, mode_height=0.2, x_title=None, **kwargs)

Plot one or more time-series datasets as vertically stacked Plotly subplots.

Each selected column is plotted in its own subplot. If multiple datasets are provided, each subplot contains one trace per dataset for that column. An optional mode_data subplot can be added above the signal subplots.

Parameters

data : pandas.DataFrame | dict | array-like | list[pandas.DataFrame | dict | array-like] One dataset or a list of datasets to plot. Non-DataFrame inputs are converted to pandas DataFrames.

str | int | None, default None

Column name or column index to set as the index for each dataset before plotting. Ignored if None.

pandas.DataFrame | pandas.Series | numpy.ndarray | list[...] | None, default None

Optional mode/categorical signal(s) to plot in a dedicated top subplot. For DataFrame inputs, a Mode column is required. If a Time column exists, it is used for the x-axis; otherwise the index is used.

bool, default False

If True, plot signal traces as markers only. If False, use the Plotly mode specified by mode.

str | None, default None

Figure title.

sequence[str] | None, default None

Columns to plot. If None, all columns from the first dataset are used.

int | None, default None

Figure height in pixels. If None, a default height is computed from the number of subplots.

int | None, default None

Maximum number of points to plot from each trace. If None or negative, all available points are used.

sequence[str] | None, default None

Dataset names used in legend entries. If omitted, traces are named by their dataset index.

str | None, default None

Title applied to all x-axes.

pandas.DataFrame | array-like | None, default None

Extra per-point hover data. This is attached to each signal trace and added to the hover tooltip. Row count should align with plotted points.

bool, default True

If True, cycle through Plotly default colors per dataset. If False, use the first default color for every dataset.

int, default 14

Font size used for subplot y-axis titles.

float, default 1.0

Opacity applied to generated trace colors.

float, default 0.005

Vertical spacing between subplot rows.

bool, default False

If True, share y-axes across subplot rows.

tuple[pandas.DataFrame, pandas.DataFrame] | None, default None

Pair of (upper_bounds, lower_bounds) DataFrames used to draw a filled band for each plotted column.

bool, default False

Only relevant when discrete=True. If True, only points where the signal changes are plotted, along with the first point.

bool, default False

If True, progressively increases y-axis title standoff on lower subplots.

int, default 4

Marker size for discrete and mode traces.

bool, default False

Whether to display the figure legend.

float, default 0.2

Relative height of the mode_data subplot, when present.

str | None, default None

Title applied only to the bottom x-axis.

**kwargs Additional keyword arguments forwarded to go.Scatter.

Returns

plotly.graph_objects.Figure The generated Plotly figure.

Raises

ValueError If input data is missing, mode_height is invalid, or a mode trace is missing its required columns.

KeyError If one of the requested columns is not found in a dataset or bounds data.

Notes
  • The returned figure includes the selected columns only.
  • If mode_data is provided, it occupies the first subplot row.
  • customdata is filtered per trace when plot_only_changes=True.

plot_transition(self, s, d)

Plots the transition histogram between two states. Retrieves the transition data between the source state s and destination state d, and generates a Plotly figure visualizing the timing distribution of the transition. The plot includes a title, an annotation indicating the transition, and a histogram of the transition timings. Args: s: The source state identifier. d: The destination state identifier. Returns: plotly.graph_objs._figure.Figure: A Plotly Figure object containing the histogram of transition timings.

view_graphviz(self, layout='dot', marker_size=20, node_positions=None, show_events=True, show_num_occur=False, show_state_label=True, font_size=10, plot_self_transitions=True, use_previos_node_positions=False, **kwargs)

Visualizes the internal graph structure using Graphviz and returns a pydot graph object. Parameters: layout (str): The layout algorithm to use for node positioning (default: "dot"). marker_size (int): Size of the node markers in the visualization (default: 20). node_positions (dict or None): Optional dictionary mapping node names to (x, y) positions. If None, positions are computed. show_events (bool): Whether to display event labels on transitions (default: True). show_num_occur (bool): Whether to display the number of occurrences for each transition (default: False). show_state_label (bool): Whether to display state labels on nodes (default: True). font_size (int): Font size for labels and annotations (default: 10). plot_self_transitions (bool): Whether to plot self-loop transitions (default: True). use_previos_node_positions (bool): Whether to reuse previously computed node positions (default: False). **kwargs: Additional keyword arguments for customization. Returns: pdp.Dot: A pydot graph object representing the visualized graph. Notes: - Node positions are either computed using Graphviz or taken from the provided/previous positions. - Annotations for transitions can include event names and/or occurrence counts. - The function prepares the graph for further rendering or export, but does not display it directly.

tools

ml4cps.tools

Various methods to transform data.

binary_ordinal_encode(column, order)

Encodes a pandas Series with binary ordinal encoding based on the specified order.

Parameters:

Name Type Description Default
column Series

The column to encode.

required
order list

The ordered list of unique values in the column.

required

Returns:

Type Description

pd.DataFrame: The binary ordinal encoded DataFrame for the given column.

generate_random_walk(start_values, steps=100)

Generates a random walk process for multiple variables.

Parameters: - start_values (list): A list of starting values for each variable. - steps (int): Number of steps in the random walk.

Returns: - pd.DataFrame: DataFrame containing the random walk process for each variable.

log_plotly_figure_to_mlflow(fig, artifact_file=None, artifact_dir='figures', file_stem='plotly_figure', include_run_id=True, start_run_if_needed=False)

Log a Plotly figure artifact to the currently active MLflow run.

Parameters

fig : Plotly figure object. artifact_file : str | None Full MLflow artifact path (for example "figures/plot.html"). If None, a filename is generated automatically. artifact_dir : str Artifact directory used when artifact_file is not provided. file_stem : str Base filename (without extension) used when artifact_file is not provided. include_run_id : bool Whether to append the active run id to the generated filename. start_run_if_needed : bool If True, starts a new MLflow run when there is no active run. If False, raises an error when no run is active.

Returns

str Artifact path used for logging.

remove_timestamps_without_change(data, sig_names=None)

Removes timestamps where no values changed in comparison to the previous timestamp.