Skip to content

API Reference

Backend

ml4cps.automata

Automaton

Bases: CPSComponent

Automaton class is the main class for modeling various kinds of hybrid systems.

Source code in ml4cps/automata/base.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
class Automaton (CPSComponent):
    """
    Automaton class is the main class for modeling various kinds of hybrid systems.
    """

    def __init__(self, states: list = None, transitions: list = None,
                 unknown_state: str = 'raise', id="", initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(),
                 **kwargs):
        """
        Class initialization from lists of elements.
        :param states: Discrete states / modes of continuous behavior.
        :param events: The events that trigger state transitions.
        :param transitions: The transition information. If a collection of dicts then dict should contain "source",
        "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection
        of tuples can be used of the form (source, event, dest, *).
        :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.
        """
        self._G = nx.MultiDiGraph()
        if initial_q and isinstance(initial_q, str):
            initial_q = [initial_q]
        self.q0 = OrderedDict.fromkeys(initial_q)
        self.initial_r = initial_r
        if final_q and isinstance(final_q, str):
            final_q = [final_q]
        self.final_q = OrderedDict.fromkeys(final_q)
        self.previous_node_positions = None
        self.UNKNOWN_STATE = unknown_state
        if super_states is not None:
            if type(super_states) is str:
                self.__super_states = [super_states]
            else:
                self.__super_states = list(super_states)

            # self._G.add_nodes_from(self.__super_states)

        if decision_states is not None:
            if type(decision_states) is str:
                self.decision_states = [decision_states]
            else:
                self.decision_states = list(decision_states)

        if states is not None:
            self._G.add_nodes_from(states)

        if transitions is not None:
            for tr in transitions:
                if type(tr) is dict:
                    self._G.add_edge(tr.pop('source'), tr.pop('destination'), event=tr.pop('event'), **tr)
                else:
                    self._G.add_edge(tr[0], tr[2], event=tr[1])

        if 'discr_state_names' not in kwargs:
            kwargs['discr_state_names'] = ['Mode']
        elif type(kwargs['discr_state_names']) is str:
            kwargs['discr_state_names'] = [kwargs['discr_state_names']]
        CPSComponent.__init__(self, id, **kwargs)

    @property
    def Sigma(self):
        Sigma = []
        for x in self.transitions:
            if len(x) >= 3 and 'event' in x[2]:
                new_el = x[2]['event']
                if new_el not in Sigma:
                    Sigma.append(new_el)
        return Sigma

    @property
    def num_events(self):
        return len(self.Sigma)

    @property
    def discrete_states(self):
        return self._G.nodes

    @property
    def transitions(self):
        return self._G.edges(data=True)

    @property
    def discrete_state(self):
        if self._q is not None:
            if len(self._q) == 0:
                return ()
            return self._q[0]
        return None

    @discrete_state.setter
    def discrete_state(self, value):
        self._q = (value,)

    @property
    def state(self):
        """
        Automata discrete state is uni-variate.
        :return:
        """
        if len(self._q) < 1:
            raise Exception(f'State of "{self.id}" is empty.')
        return self._q[0], self._xt, self._xk

    @state.setter
    def state(self, state):
        if type(state) is not tuple:
            new_states = (state, (), ())
        elif len(state) < 3:
            new_states = [(), (), ()]
            for i, v in enumerate(state):
                new_states[i] = v
        else:
            new_states = state

        if new_states[0] not in self.discrete_states and new_states[0] not in self.__super_states and new_states[0] != self.UNKNOWN_STATE:
            raise ValueError(f'State "{new_states[0]}" is not a valid state. Valid states are: {self.discrete_states}')

        self._q = (new_states[0],)
        self._xt = new_states[1]
        self._xk = new_states[2]

    @property
    def num_modes(self):
        """
        Returns the number of modes in the automaton.
        :return: number of states.
        """
        return self._G.number_of_nodes()

    @property
    def num_transitions(self):
        """
        Returns the number of transitions in the automaton.
        :return: number of transitions.
        """
        return self._G.number_of_edges()

    def merge(self, q1, q2):
        """
        If two states are compatible, they are merged with the function merge. The transitions
        of the automaton, the in- and outdegree of the states and the number of transitions
        happening are adjusted.
        """
        intr = self.in_transitions(q2)
        # outtr_q1 = list(self.out_transitions(q1))
        outtr_q2 = list(self.out_transitions(q2))
        # nx.contracted_nodes(self._G, w, v, copy=False)
        # set event

        for tr in intr:
            if tr[0] != q2:
                self.add_single_transition(tr[0], q1, tr[2], timing=tr[-1]['timing'])
        # if q1=='q0' and q2 == 'q192':
        #     print('found')
        for tr in outtr_q2:
            dest = tr[1]
            if dest == q2:
                dest = q1
            self.add_single_transition(q1, dest, tr[2], timing=tr[-1]['timing'])
        self.remove_state(q2)

    def determinize(self, s, state_index, verbose=False):
        # out_tr = list(sorted(self.out_transitions(s), key=lambda x: state_index[x[1]]))
        if verbose:
            print('Determinize', s)
        # ind = sorted(, key=lambda x: state_index[x])
        # state_index_list = [None] * len(state_index)
        # for k, v in state_index.items():
        #     state_index_list[v] = k
        # if verbose:
        #     pprint.pprint(state_index_list[state_index[s]:], compact=True)

        to_analyze = [s]
        while to_analyze:  # state_ind < len(state_index_list):
            # print(state_ind)
            s = to_analyze.pop()
            # if not self.is_state(s):
            #     continue

            conflicting_transitions = {}
            to_merge = None
            for out_tr in self.out_transitions(s):
                if out_tr[2] in conflicting_transitions:
                    to_merge = (conflicting_transitions[out_tr[2]], out_tr[1])
                    break
                else:
                    conflicting_transitions[out_tr[2]] = out_tr[1]
            # out_tr = list(self.out_transitions(st))
            if to_merge is not None:
                if state_index[to_merge[0]] < state_index[to_merge[1]]:
                    merge_into = to_merge[0]
                    to_be_merged = to_merge[1]
                else:
                    merge_into = to_merge[1]
                    to_be_merged = to_merge[0]

                to_analyze.append(s)
                to_analyze.append(merge_into)
                if verbose:
                    print('Merge to determinize: ', to_be_merged, 'into', merge_into)
                self.merge(merge_into, to_be_merged)

        # to_determinize = []
        # for i, tr in enumerate(out_tr):
        #     if self.is_state(tr[1]):
        #         for tr2 in out_tr[:i:-1]:
        #             if self.is_state(tr2[1]) and tr[2] == tr2[2]:
        #                 # print(tr[1], ',', tr2[1])
        #                 if state_index[tr2[1]] > state_index[tr[1]]:
        #                     print('Merge to determinize: ', tr2[1], 'into', tr[1])
        #                     self.merge(tr[1], tr2[1])
        #                     to_determinize.append(tr[1])
        #                     # self.determinize(tr[1], state_index)
        #                 else:
        #                     print('Merge to determinize: ', tr[1], 'into', tr2[1])
        #                     self.merge(tr2[1], tr[1])
        #                     to_determinize.append(tr2[1])
        #                     # self.determinize(tr2[1], state_index)
        # for s in to_determinize:
        #     self.determinize(s, state_index)

    def accepts(self, string, return_states=False):
        state_path = []
        current_state = next(iter(self.q0))
        state_path.append(current_state)
        for symbol in string:
            trans = self.out_transitions(current_state, symbol)
            if trans is None or not trans:
                if return_states:
                    return False, state_path
                else:
                    return False
            elif len(trans) > 1:
                raise Exception(f'Too many transitions from {current_state} given symbol {symbol}.')
            else:
                current_state = trans[0][1]
                state_path.append(current_state)
        accepted = current_state in self.final_q or self.final_q is None or len(self.final_q) == 0
        if return_states:
            return accepted, state_path
        else:
            return accepted

    def generate(self, number_of_sequences=1, return_states=False, max_steps=100, prob_to_accept=0.5):
        if number_of_sequences > 1:
            res = [self.generate(return_states=return_states, prob_to_accept=prob_to_accept)
                   for _ in range(number_of_sequences)]
            return res
        state_path = []
        current_state = next(iter(self.q0))
        event_path = []
        state_path.append(current_state)
        while True:
            if current_state in self.final_q:
                if random.random() < prob_to_accept:
                    break
            trans = self.out_transitions(current_state)
            if trans is None or not trans:
                if current_state not in self.final_q:
                    raise Exception(f'State "{current_state}" is not an accepting state, but no possible transitions.')
                break
            else:
                new_trans = random.choices(list(trans), weights=[x[3].get('prob', 1) for x in trans])
                current_state = new_trans[0][1]
                current_event = new_trans[0][3]['event']
                state_path.append(current_state)
                event_path.append(current_event)
                if max_steps is not None and len(event_path) >= max_steps:
                    break
        if return_states:
            return event_path, state_path
        else:
            return event_path


    def try_merge_states(self, state1, state2, try_fun=None):
        """Merge state2 into state1 and update transitions."""
        old_G = self._G
        make_final = state2 in self.final_q and state1 not in self.final_q
        make_initial = state2 in self.q0 and state1 not in self.q0
        self._G = nx.contracted_nodes(self._G, state1, state2)
        # make it deterministic again
        events = [x[3]['event'] for x in self.out_transitions(state1)]
        deterministic = len(set(events)) == len(events)

        if deterministic:
            if make_initial:
                self.add_initial_state(state1)
            if make_final:
                self.add_final_state(state1)

            if try_fun is not None and not try_fun(self):
                self._G = old_G
                if make_final:
                    self.final_q.pop(state1)
                if make_initial:
                    self.q0.pop(state1)
        else:
            self._G = old_G


    def remove_transition(self, source, dest):
        """
        Remove the transition(s) from source to dest.
        :param source:
        :param dest:
        :return:
        """
        self._G.remove_edge(source, dest)

    def is_decision(self, state, overall_state):
        return state in self.decision_states

    def get_alternatives(self, state, system_state):
        if self.is_decision(state, system_state):
            trans = [(x[3]['event'], dict()) for x in self.out_transitions(state)]
            return [(None, None)] + trans
        else:
            return None

    def remove_rare_transitions(self, min_p=0, min_num=0, keep_from_initial=False, keep_states=False, keep=None):
        self.learn_transition_probabilities()

        for source, dest, event, data in self.get_transitions():
            if keep_from_initial and source in self.q0:
                continue
            if (len(data['timing']) <= min_num or data['probability'] < min_p) and \
                    ((keep is None) or (keep is not None and source not in keep and dest not in keep)):
                self.remove_transition(source, dest)

        if not keep_states:
            for s in list(self.discrete_states):
                # if s in self.q0:
                #     continue
                if len(self.in_transitions(s)) == 0 and len(self.out_transitions(s)) == 0:
                    self.remove_state(s)

            # if self.DummyInitial:
            #     for s in list(self.InitialState):
            #         if len(self.out_transitions(s)) == 0:
            #             self.States.pop(s)
            #             self.InitialState.pop(s)

        # recalculate probabilities
        if min_p:
            self.learn_transition_probabilities()

    def learn_transition_probabilities(self):
        for s in self.discrete_states:
            total_num = sum([len(data['timing']) for s, d, e, data in self.out_transitions(s)])
            for s, d, e, data in self.out_transitions(s):
                data['probability'] = len(data['timing']) / total_num

    def state_is_deterministic(self, q):
        events = set()
        for tr in self.out_transitions(q):
            if tr[2] in events:
                return False
            else:
                events.add(tr[2])
        return True

    def update_timing_boundaries(self, source, destination, event, newTiming):
        edge_data = self._G.get_edge_data(source, destination, event)
        try:
            if newTiming < edge_data['minTiming']:
                edge_data['minTiming'] = newTiming
            elif newTiming > edge_data['maxTiming']:
                edge_data['maxTiming'] = newTiming
        except KeyError:
            edge_data['minTiming'] = newTiming
            edge_data['maxTiming'] = newTiming

    def is_deterministic(self):
        for q in self.discrete_states:
            if not self.state_is_deterministic(q):
                print('State', q, 'not deterministic:')
                for tr in sorted(self.out_transitions(q), key=lambda x: x[2]):
                    print(tr[0], '->', tr[2], '->', tr[1])
                return False
        return True

    def add_single_transition(self, s, d, e, timing=None):
        edge_data = self._G.get_edge_data(s, d, e)
        if edge_data is None:
            if timing is None:
                self._G.add_edge(s, d, key=e)
            else:
                try:
                    timing = list(timing)
                    self._G.add_edge(s, d, key=e, event=e, timing=timing)
                except:
                    self._G.add_edge(s, d, key=e, event=e, timing=[timing])
        elif timing is not None:
            try:
                timing = list(timing)
                edge_data['timing'] += timing
            except:
                edge_data['timing'].append(timing)

    def add_state_data(self, s: str, d: object):
        """
    Add state data to a state s the automaton.
        :param s: state
        :param d: data to be added to s
        :return:
        """
        self.Q[s] = d

    def add_state(self, new_state, **kwargs):
        """
    Add state to the automaton.
        :param new_state: State to be added.
        """
        self._G.add_node(new_state, **kwargs)

    def add_states_from(self, new_state, **kwargs):
        """
    Add multiple states to the automaton.
        :param new_state: States to be added.
        """
        self._G.add_nodes_from(new_state, **kwargs)

    def add_transitions_from(self, list_of_tuples, **other):
        """
    Add multiple transition.
        :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
        """
        self._G.add_edges_from(list_of_tuples, **other)

    def add_transition(self, s, d, e, **other):
        """
    Add multiple transition.
        :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
        """
        self._G.add_edge(s, d, e, **other, event=e)

    def add_initial_state(self, states):
        """
    Add initial state(s) of the automaton.
        :param states: States to add.
        """
        if type(states) is str:
            states = (states,)
        for s in states:
            if s is not None and s not in self.q0:
                self.q0[s] = None
            self._G.add_node(s)

    def add_final_state(self, states):
        """
    Add final state(s) of the automaton.
        :param states: States to add.
        """
        if type(states) is str:
            states = (states,)
        for s in states:
            if s is not None and s not in self.final_q:
                self.final_q[s] = None
            self._G.add_node(s)

    def is_transition(self, s, d, e):
        """
    Check if a transition (s,d,e) exists in the automaton.
        :param s: Source.
        :param d: Destination.
        :param e: Event.
        :return:
        """
        transitions = [trans for trans in self.T.values() if trans['source'] == s and
                       trans['destination'] == d and trans['event'] == e]

        is_transition = len(transitions) != 0
        return is_transition

    # def num_occur(self, q, e):
    #     tr = self.get_transition(q, e=e)
    #     if tr[-1]:
    #         return len(tr[-1]['timing'])
    #     else:
    #         return ""

    def num_occur(self, tr):
        return len(tr[-1]['timing'])

    def num_timings(self):
        return sum(len(tr[-1]['timing']) for tr in self.get_transitions())

    def get_num_in(self, q):
        """
        Returns the number of in transitions of state q in the automaton.
        :return: number of transitions.
        """
        if self._G.has_node(q):
            return self._G.in_degree(q)
        else:
            raise Exception(f'State {q} not in the automaton.')

    def get_num_out(self, q):
        """
        Returns the number of out transitions of state q in the automaton.
        :return: number of transitions.
        """
        if self._G.has_node(q):
            return self._G.out_degree(q)
        else:
            raise Exception(f'State {q} not in the automaton.')

    def is_state(self, q):
        return self._G.has_node(q)

    def remove_state(self, s):
        self._G.remove_node(s)
        if s in self.q0:
            self.q0.pop(s)

    def in_transitions(self, s, event=None):
        """
    Get all incoming transitions of state s.
        :param s:
        :return:
        """
        if event is None:
            return self._G.in_edges(s, data=True, keys=True)
        else:
            return [e for e in self._G.in_edges(s, data=True, keys=True) if e[3]['event'] == event]

    def out_transitions(self, s, event=None):
        """
    Get all outgoing transitions of state s.
        :param event:
        :param s:
        :return:
        """
        if event is None:
            return self._G.out_edges(s, data=True, keys=True)
        else:
            return [e for e in self._G.out_edges(s, data=True, keys=True) if e[3]['event'] == event]

    def discrete_event_dynamics(self, q, xt, xk, p) -> tuple:
        e = p["event"]
        new_q = self.UNKNOWN_STATE
        possible_destinations = set(d for s, d, ev in self._G.out_edges(q, data='event') if ev == e)
        if len(possible_destinations) == 1:
            new_q = possible_destinations.pop()
        else:
            stoch_dest = list((d, data.get('p', 0)) for s, d, data in self._G.out_edges(q, data=True) if data['event'] == e)
            if stoch_dest:
                new_q = random.choices([x[0] for x in stoch_dest], weights=[x[1] for x in stoch_dest])[0]
            else: # try to revocer
                dests = set(d for s, d, ev in self._G.edges(data='event') if ev == e)
                if len(dests) == 1:
                    new_q = dests.pop()
        return (new_q,), None, None

    def guards(self, q, x):
        pass

    def timed_event(self, q, xc, xd):
        possible_destinations = list(dict(ev, dest=d) for s, d, ev in self._G.out_edges(q, data=True) if s == q)
        if possible_destinations:
            if len(possible_destinations) == 1:
                dest = possible_destinations[0]
            else:
                dest = np.random.choice(possible_destinations, p=[p['prob']/sum(x['prob'] for x in possible_destinations) if 'prob' in p else 1/len(possible_destinations) for p in possible_destinations])
            if 'time' in dest:
                time = dest['time']
                if callable(time):
                    time = time()
                return time, dest.get('destination', dest['dest'])
        return None, None

    def get_transition(self, s, d=None, e=None, if_more_than_one='raise'):
        """
        Get all transitions with source state s, destination state __d. In case when e is provided, the returned list
        contains transitions where event is e.
        :param if_more_than_one:
        :param s: Source state.
        :param d: Destination state.
        :param e: Event.
        :return:
        """
        transitions = self._G.out_edges(s, keys=True, data=True)
        if e is None and d is not None:
            transitions = [trans for trans in transitions if trans[1] == d]
        elif d is None and e is not None:
            transitions = [trans for trans in transitions if trans[2] == e]
        else:
            transitions = [trans for trans in transitions if trans[1] == d and trans[2] == e]

        if len(transitions) > 1:
            if if_more_than_one == 'raise':
                raise Exception('There are multiple transitions which satisfy the condition.')
            else:
                return transitions
        elif len(transitions) == 0:
            return None
        else:
            return transitions[0]

    def rename_events(self, prefix="e_"):
        """
    Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.
        """
        i = 0
        new_events_dict = OrderedDict()
        for k, v in self.Sigma.items():
            new_key = f'{prefix}{i}'
            new_value = v
            if new_value is None:
                new_value = {}
            new_value['old_symbol'] = k
            i += 1
            new_events_dict[new_key] = new_value
            for t in self.T.values():
                if t['event'] == k:
                    t['event'] = new_key
        # self.Sigma = new_events_dict

    def step(self, q, x0, t, u):
        """
    Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
        :param x0: Initial state vector.
        :param t: Time at start of the step simulation.
        :param u: Arguments passed.....
        :return: Time t+dt, value of state at t+dt
        """
        s = solve_ivp(self.flow, t_span=(t, t + self.dt), y0=x0, method='RK23', args=u)
        xc = s.y[:, -1]
        t = s.t[-1]

        xk = self.time_discrete_dynamics(t, q, x0)
        return t, np.concatenate(xc, xk)

    def __str__(self):
        """
    String representation of the automaton.
        :return:
        """
        return f"""Automaton:
    Number of states: {self.num_modes}
    Number of transitions: {self.num_transitions}
    Initial state(s): {list(self.q0.keys())}
    Final state(s): {list(self.final_q.keys())}"""

    def flow(self, q, p, x, u):
        """
    Flow equation gives derivative of the continuous variables.
        :param q: Current discrete state of the model.
        :param p: Stochastic parameters generated on entry to state current_q.
        :param x: Current continuous state of the model.
        :param u: Calculated internal i signals.
        :return: Derivative of the continuous state x.
        """
        pass

    def inv(self, t, q, x, y, z, p):
        """
    Invariants.
        :param t:
        :param q:
        :param x:
        :param y:
        :param z:
        :param p:
        """
        pass

    def get_transitions(self):
        return list(self._G.edges(data=True, keys=True))

    def print_state(self, v):
        """Prints outgoing transitions of a state v.

        Args:
            v (state): 

        Returns:
            String: Description of the outgoing transitions of the state.
        """
        s = f'<b>{str(v)}</b>'
        for tr in self.out_transitions(v):
            try:
                num_occur = f'[{self.num_occur(tr)}]'
            except:
                num_occur = '/'
            s += f"<br>{tr[2]} -> {tr[1]} {num_occur}"
        return s

    def sample_initial(self):
        if len(self.q0) == 0:
            current_q = np.random.choice(list(self.discrete_states.keys()), 1)[-1]
            warnings.warn(
                'Initial state not defined, sampling initial state uniformly from the set of all states.')
        else:
            current_q = np.random.choice(list(self.q0.keys()), 1)[-1]
        return current_q

    # def simulate(self, finish_time=100, current_q=None):
    #     """
    #     Simulates behaviour of the system.
    #     :param finish_time: Time when simulation finishes.
    #     :return: generated data.
    #     """
    #
    #     if current_q is None:
    #         if len(self.q0) == 0:
    #             current_q = np.random.choice(list(self.discrete_states.keys()), 1)[-1]
    #             warnings.warn(
    #                 'Initial state not defined, sampling initial state uniformly from the set of all states.')
    #         else:
    #             current_q = np.random.choice(list(self.q0.keys()), 1)[-1]
    #
    #     state_sequence = []
    #     data_state = []
    #     t = 0
    #     last_x = None
    #     last_output = None
    #     states = []
    #     data = []
    #     current_e = None
    #
    #     while True:
    #         cont_state, cont_time, cont_output = self.__step_continuous()
    #         last_x = dict(cont_state.iloc[-1])
    #         last_output = dict(cont_output.iloc[-1])
    #         cont_time = cont_time + t - cont_time.iloc[0, 0]
    #         clock = cont_time.iloc[-1, 0] - cont_time.iloc[0, 0]
    #
    #         current_state = current_q
    #
    #         observed_current_state = current_state
    #         state_sequence.append(
    #             pd.DataFrame(np.full((cont_time.size - 1, 3), (current_state, observed_current_state, current_e)),
    #                          index=cont_time.iloc[0:-1, 0],
    #                          columns=['State', 'Observed State', 'Event']))
    #
    #         data_state.append(cont_output.iloc[:-1].set_index(cont_time.iloc[0:-1, 0]))
    #
    #         tr = self.out_transitions(current_state)
    #
    #         if len(tr) == 0:
    #             break
    #         elif len(tr) != 1:
    #             tr = np.random.choice(tr, 1)[-1]
    #             warnings.warn('Multiple transitions can occur.')
    #         else:
    #             tr = tr[0]
    #
    #         last_q = current_q
    #         current_e = tr.event
    #         self.apply_sim_event(current_e)
    #         if cont_time.size == 0:
    #             break
    #         t += clock
    #
    #         if t >= finish_time:
    #             break
    #
    #         states.append(pd.concat(state_sequence, axis=0))
    #         data.append(pd.concat(data_state, axis=0))
    #     return states, data

    def predict_state(self, data_collection, time_col_name=None, discr_col_names=None):
        for data in data_collection:
            data["StateEstimate"] = None
            data["Event"] = None

            prev_discr_state = None
            prev_time = None

            if discr_col_names is None:
                discr_col_names = data.columns
            if time_col_name is not None:
                discr_col_names -= time_col_name

            for row in data[discr_col_names].itertuples(index=True):
                if time_col_name is not None:
                    time = data[time_col_name].iloc[row[0]]
                else:
                    time = row[0]
                discr_state = row[1:]
                if prev_discr_state is not None and prev_discr_state != discr_state:
                    event = np.asarray(discr_state) - np.asarray(prev_discr_state)
                    event = ' '.join(str(x) for x in event)
                    data.loc[row[0], "Event"] = event

                data.loc[row[0], "StateEstimate"] = signal_vector_to_state(discr_state)
                prev_discr_state = discr_state
                prev_time = time
        return data_collection


    def read_event(self, t, e, clear_p=False, keep_p=None, **kwargs):
        if keep_p:
            for k in keep_p:
                kwargs[k] = self._p[k]
        if clear_p or keep_p:
            self._p = kwargs
        else:
            self._p.update(kwargs)
        self._p.pop('Error Message', None)
        self._t = t

        self._p['event'] = e

        new_q, self._xt, self._xk = self.discrete_event_dynamics(self._q, self._xt, self._xk, self._p)
        new_q = new_q[0]

        if new_q == self.UNKNOWN_STATE:
            text2 = '{}->{}->{}'.format(self.discrete_state, e, new_q)
            self._p['Error Message'] = text2
        else:
            dl = self.decision_logic
            # d = self.check_decisions(self.x, self.overall_system.state)
            if dl and not self.overall_system.is_unknown(self):
                try:
                    state_key = json.dumps(self.overall_system.get_choice_state(self), sort_keys=True)
                except:
                    print('Problem getting choice state')
                old_e = self.choices_set.get(state_key, None)
                if old_e is not None:
                    if self._e not in old_e:
                        # warnings.warn(
                        #     '{}: Different decision for same state {}: {} vs {}'.format(t, state_key, old_e, self._e))
                        self.choices_set[state_key][self._e] = [str(t)]
                    else:
                        self.choices_set[state_key][self._e].append(str(t))
                else:
                    self.choices_set[state_key] = {self._e: [str(t)]}
                # if self._e not in (x[0] for x in d):
                #     warnings.warn('{}: Not allowed decision in state {}: {}'.format(t, state_key, self._e))

        self.discrete_state = new_q
        self._past_t.append(t)
        self._past_p.append(self._p)
        if len(self._discrete_state_data):
            self._discrete_state_data[-1]["Finish"] = t
        self._discrete_state_data.append(dict(Time=t, Event=e, Mode=self.discrete_state, **self._p))
        self._discrete_output_data.append([t, *self._d, self._p.get('event', None)])

num_modes property

Returns the number of modes in the automaton. :return: number of states.

num_transitions property

Returns the number of transitions in the automaton. :return: number of transitions.

state property writable

Automata discrete state is uni-variate. :return:

__init__(states=None, transitions=None, unknown_state='raise', id='', initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(), **kwargs)

Class initialization from lists of elements. :param states: Discrete states / modes of continuous behavior. :param events: The events that trigger state transitions. :param transitions: The transition information. If a collection of dicts then dict should contain "source", "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection of tuples can be used of the form (source, event, dest, *). :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.

Source code in ml4cps/automata/base.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, states: list = None, transitions: list = None,
             unknown_state: str = 'raise', id="", initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(),
             **kwargs):
    """
    Class initialization from lists of elements.
    :param states: Discrete states / modes of continuous behavior.
    :param events: The events that trigger state transitions.
    :param transitions: The transition information. If a collection of dicts then dict should contain "source",
    "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection
    of tuples can be used of the form (source, event, dest, *).
    :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.
    """
    self._G = nx.MultiDiGraph()
    if initial_q and isinstance(initial_q, str):
        initial_q = [initial_q]
    self.q0 = OrderedDict.fromkeys(initial_q)
    self.initial_r = initial_r
    if final_q and isinstance(final_q, str):
        final_q = [final_q]
    self.final_q = OrderedDict.fromkeys(final_q)
    self.previous_node_positions = None
    self.UNKNOWN_STATE = unknown_state
    if super_states is not None:
        if type(super_states) is str:
            self.__super_states = [super_states]
        else:
            self.__super_states = list(super_states)

        # self._G.add_nodes_from(self.__super_states)

    if decision_states is not None:
        if type(decision_states) is str:
            self.decision_states = [decision_states]
        else:
            self.decision_states = list(decision_states)

    if states is not None:
        self._G.add_nodes_from(states)

    if transitions is not None:
        for tr in transitions:
            if type(tr) is dict:
                self._G.add_edge(tr.pop('source'), tr.pop('destination'), event=tr.pop('event'), **tr)
            else:
                self._G.add_edge(tr[0], tr[2], event=tr[1])

    if 'discr_state_names' not in kwargs:
        kwargs['discr_state_names'] = ['Mode']
    elif type(kwargs['discr_state_names']) is str:
        kwargs['discr_state_names'] = [kwargs['discr_state_names']]
    CPSComponent.__init__(self, id, **kwargs)

__str__()

String representation of the automaton. :return:

Source code in ml4cps/automata/base.py
654
655
656
657
658
659
660
661
662
663
def __str__(self):
    """
String representation of the automaton.
    :return:
    """
    return f"""Automaton:
Number of states: {self.num_modes}
Number of transitions: {self.num_transitions}
Initial state(s): {list(self.q0.keys())}
Final state(s): {list(self.final_q.keys())}"""

add_final_state(states)

Add final state(s) of the automaton. :param states: States to add.

Source code in ml4cps/automata/base.py
469
470
471
472
473
474
475
476
477
478
479
def add_final_state(self, states):
    """
Add final state(s) of the automaton.
    :param states: States to add.
    """
    if type(states) is str:
        states = (states,)
    for s in states:
        if s is not None and s not in self.final_q:
            self.final_q[s] = None
        self._G.add_node(s)

add_initial_state(states)

Add initial state(s) of the automaton. :param states: States to add.

Source code in ml4cps/automata/base.py
457
458
459
460
461
462
463
464
465
466
467
def add_initial_state(self, states):
    """
Add initial state(s) of the automaton.
    :param states: States to add.
    """
    if type(states) is str:
        states = (states,)
    for s in states:
        if s is not None and s not in self.q0:
            self.q0[s] = None
        self._G.add_node(s)

add_state(new_state, **kwargs)

Add state to the automaton. :param new_state: State to be added.

Source code in ml4cps/automata/base.py
429
430
431
432
433
434
def add_state(self, new_state, **kwargs):
    """
Add state to the automaton.
    :param new_state: State to be added.
    """
    self._G.add_node(new_state, **kwargs)

add_state_data(s, d)

Add state data to a state s the automaton. :param s: state :param d: data to be added to s :return:

Source code in ml4cps/automata/base.py
420
421
422
423
424
425
426
427
def add_state_data(self, s: str, d: object):
    """
Add state data to a state s the automaton.
    :param s: state
    :param d: data to be added to s
    :return:
    """
    self.Q[s] = d

add_states_from(new_state, **kwargs)

Add multiple states to the automaton. :param new_state: States to be added.

Source code in ml4cps/automata/base.py
436
437
438
439
440
441
def add_states_from(self, new_state, **kwargs):
    """
Add multiple states to the automaton.
    :param new_state: States to be added.
    """
    self._G.add_nodes_from(new_state, **kwargs)

add_transition(s, d, e, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

Source code in ml4cps/automata/base.py
450
451
452
453
454
455
def add_transition(self, s, d, e, **other):
    """
Add multiple transition.
    :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
    """
    self._G.add_edge(s, d, e, **other, event=e)

add_transitions_from(list_of_tuples, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

Source code in ml4cps/automata/base.py
443
444
445
446
447
448
def add_transitions_from(self, list_of_tuples, **other):
    """
Add multiple transition.
    :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ...<unused>...).
    """
    self._G.add_edges_from(list_of_tuples, **other)

flow(q, p, x, u)

Flow equation gives derivative of the continuous variables. :param q: Current discrete state of the model. :param p: Stochastic parameters generated on entry to state current_q. :param x: Current continuous state of the model. :param u: Calculated internal i signals. :return: Derivative of the continuous state x.

Source code in ml4cps/automata/base.py
665
666
667
668
669
670
671
672
673
674
def flow(self, q, p, x, u):
    """
Flow equation gives derivative of the continuous variables.
    :param q: Current discrete state of the model.
    :param p: Stochastic parameters generated on entry to state current_q.
    :param x: Current continuous state of the model.
    :param u: Calculated internal i signals.
    :return: Derivative of the continuous state x.
    """
    pass

get_num_in(q)

Returns the number of in transitions of state q in the automaton. :return: number of transitions.

Source code in ml4cps/automata/base.py
508
509
510
511
512
513
514
515
516
def get_num_in(self, q):
    """
    Returns the number of in transitions of state q in the automaton.
    :return: number of transitions.
    """
    if self._G.has_node(q):
        return self._G.in_degree(q)
    else:
        raise Exception(f'State {q} not in the automaton.')

get_num_out(q)

Returns the number of out transitions of state q in the automaton. :return: number of transitions.

Source code in ml4cps/automata/base.py
518
519
520
521
522
523
524
525
526
def get_num_out(self, q):
    """
    Returns the number of out transitions of state q in the automaton.
    :return: number of transitions.
    """
    if self._G.has_node(q):
        return self._G.out_degree(q)
    else:
        raise Exception(f'State {q} not in the automaton.')

get_transition(s, d=None, e=None, if_more_than_one='raise')

Get all transitions with source state s, destination state __d. In case when e is provided, the returned list contains transitions where event is e. :param if_more_than_one: :param s: Source state. :param d: Destination state. :param e: Event. :return:

Source code in ml4cps/automata/base.py
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
def get_transition(self, s, d=None, e=None, if_more_than_one='raise'):
    """
    Get all transitions with source state s, destination state __d. In case when e is provided, the returned list
    contains transitions where event is e.
    :param if_more_than_one:
    :param s: Source state.
    :param d: Destination state.
    :param e: Event.
    :return:
    """
    transitions = self._G.out_edges(s, keys=True, data=True)
    if e is None and d is not None:
        transitions = [trans for trans in transitions if trans[1] == d]
    elif d is None and e is not None:
        transitions = [trans for trans in transitions if trans[2] == e]
    else:
        transitions = [trans for trans in transitions if trans[1] == d and trans[2] == e]

    if len(transitions) > 1:
        if if_more_than_one == 'raise':
            raise Exception('There are multiple transitions which satisfy the condition.')
        else:
            return transitions
    elif len(transitions) == 0:
        return None
    else:
        return transitions[0]

in_transitions(s, event=None)

Get all incoming transitions of state s. :param s: :return:

Source code in ml4cps/automata/base.py
536
537
538
539
540
541
542
543
544
545
def in_transitions(self, s, event=None):
    """
Get all incoming transitions of state s.
    :param s:
    :return:
    """
    if event is None:
        return self._G.in_edges(s, data=True, keys=True)
    else:
        return [e for e in self._G.in_edges(s, data=True, keys=True) if e[3]['event'] == event]

inv(t, q, x, y, z, p)

Invariants. :param t: :param q: :param x: :param y: :param z: :param p:

Source code in ml4cps/automata/base.py
676
677
678
679
680
681
682
683
684
685
686
def inv(self, t, q, x, y, z, p):
    """
Invariants.
    :param t:
    :param q:
    :param x:
    :param y:
    :param z:
    :param p:
    """
    pass

is_transition(s, d, e)

Check if a transition (s,d,e) exists in the automaton. :param s: Source. :param d: Destination. :param e: Event. :return:

Source code in ml4cps/automata/base.py
481
482
483
484
485
486
487
488
489
490
491
492
493
def is_transition(self, s, d, e):
    """
Check if a transition (s,d,e) exists in the automaton.
    :param s: Source.
    :param d: Destination.
    :param e: Event.
    :return:
    """
    transitions = [trans for trans in self.T.values() if trans['source'] == s and
                   trans['destination'] == d and trans['event'] == e]

    is_transition = len(transitions) != 0
    return is_transition

merge(q1, q2)

If two states are compatible, they are merged with the function merge. The transitions of the automaton, the in- and outdegree of the states and the number of transitions happening are adjusted.

Source code in ml4cps/automata/base.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def merge(self, q1, q2):
    """
    If two states are compatible, they are merged with the function merge. The transitions
    of the automaton, the in- and outdegree of the states and the number of transitions
    happening are adjusted.
    """
    intr = self.in_transitions(q2)
    # outtr_q1 = list(self.out_transitions(q1))
    outtr_q2 = list(self.out_transitions(q2))
    # nx.contracted_nodes(self._G, w, v, copy=False)
    # set event

    for tr in intr:
        if tr[0] != q2:
            self.add_single_transition(tr[0], q1, tr[2], timing=tr[-1]['timing'])
    # if q1=='q0' and q2 == 'q192':
    #     print('found')
    for tr in outtr_q2:
        dest = tr[1]
        if dest == q2:
            dest = q1
        self.add_single_transition(q1, dest, tr[2], timing=tr[-1]['timing'])
    self.remove_state(q2)

out_transitions(s, event=None)

Get all outgoing transitions of state s. :param event: :param s: :return:

Source code in ml4cps/automata/base.py
547
548
549
550
551
552
553
554
555
556
557
def out_transitions(self, s, event=None):
    """
Get all outgoing transitions of state s.
    :param event:
    :param s:
    :return:
    """
    if event is None:
        return self._G.out_edges(s, data=True, keys=True)
    else:
        return [e for e in self._G.out_edges(s, data=True, keys=True) if e[3]['event'] == event]

print_state(v)

Prints outgoing transitions of a state v.

Parameters:

Name Type Description Default
v state
required

Returns:

Name Type Description
String

Description of the outgoing transitions of the state.

Source code in ml4cps/automata/base.py
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
def print_state(self, v):
    """Prints outgoing transitions of a state v.

    Args:
        v (state): 

    Returns:
        String: Description of the outgoing transitions of the state.
    """
    s = f'<b>{str(v)}</b>'
    for tr in self.out_transitions(v):
        try:
            num_occur = f'[{self.num_occur(tr)}]'
        except:
            num_occur = '/'
        s += f"<br>{tr[2]} -> {tr[1]} {num_occur}"
    return s

remove_transition(source, dest)

Remove the transition(s) from source to dest. :param source: :param dest: :return:

Source code in ml4cps/automata/base.py
321
322
323
324
325
326
327
328
def remove_transition(self, source, dest):
    """
    Remove the transition(s) from source to dest.
    :param source:
    :param dest:
    :return:
    """
    self._G.remove_edge(source, dest)

rename_events(prefix='e_')

Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.

Source code in ml4cps/automata/base.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
def rename_events(self, prefix="e_"):
    """
Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.
    """
    i = 0
    new_events_dict = OrderedDict()
    for k, v in self.Sigma.items():
        new_key = f'{prefix}{i}'
        new_value = v
        if new_value is None:
            new_value = {}
        new_value['old_symbol'] = k
        i += 1
        new_events_dict[new_key] = new_value
        for t in self.T.values():
            if t['event'] == k:
                t['event'] = new_key

step(q, x0, t, u)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

Source code in ml4cps/automata/base.py
639
640
641
642
643
644
645
646
647
648
649
650
651
652
def step(self, q, x0, t, u):
    """
Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
    :param x0: Initial state vector.
    :param t: Time at start of the step simulation.
    :param u: Arguments passed.....
    :return: Time t+dt, value of state at t+dt
    """
    s = solve_ivp(self.flow, t_span=(t, t + self.dt), y0=x0, method='RK23', args=u)
    xc = s.y[:, -1]
    t = s.t[-1]

    xk = self.time_discrete_dynamics(t, q, x0)
    return t, np.concatenate(xc, xk)

try_merge_states(state1, state2, try_fun=None)

Merge state2 into state1 and update transitions.

Source code in ml4cps/automata/base.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def try_merge_states(self, state1, state2, try_fun=None):
    """Merge state2 into state1 and update transitions."""
    old_G = self._G
    make_final = state2 in self.final_q and state1 not in self.final_q
    make_initial = state2 in self.q0 and state1 not in self.q0
    self._G = nx.contracted_nodes(self._G, state1, state2)
    # make it deterministic again
    events = [x[3]['event'] for x in self.out_transitions(state1)]
    deterministic = len(set(events)) == len(events)

    if deterministic:
        if make_initial:
            self.add_initial_state(state1)
        if make_final:
            self.add_final_state(state1)

        if try_fun is not None and not try_fun(self):
            self._G = old_G
            if make_final:
                self.final_q.pop(state1)
            if make_initial:
                self.q0.pop(state1)
    else:
        self._G = old_G

CPSComponent

Bases: PythonModel, Simulator

General hybrid system class based on scipy and simpy.

Source code in ml4cps/cps/base.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
class CPSComponent(PythonModel, sim.Simulator):
    """
    General hybrid system class based on scipy and simpy.
    """

    def __init__(self, id, t=0, initial_q=(), initial_xt: list = (), initial_xk: list = (), dt=-1., p=None,
                 cont_state_names=None, discr_state_names=None, discr_output_names=None,
                 cont_output_names=None, unknown_state="Unknown"):
        sim.Simulator.__init__(self)
        PythonModel.__init__(self)

        self._parent_system = None
        self.decision_logic = None
        self.id = id
        self.dt = dt

        # State variables
        self._t = t
        self._q = initial_q
        self._xt = initial_xt
        self._xk = initial_xk

        # Other variables
        self._y = None
        self._u = None
        self._d = ()
        self._p = {} if p is None else p

        # simpy events
        self._pending_events = []
        self._block_event = None

        # List to append the past values
        self._past_p = []
        self._past_t = []
        self._discrete_state_data = []
        self._discrete_output_data = []
        self._continuous_state_data = []
        self._continuous_output_data = []

        # Variable names
        self.cont_state_names = cont_state_names
        self.cont_output_names = cont_output_names
        self.discrete_state_names = discr_state_names
        self.discrete_output_names = discr_output_names

        self.UNKNOWN_STATE = unknown_state

        self.output_y = None
        self.output_d = None



    # def copy(self):
    #     new_copy = CPSComponent(self.id, self._t, initial_q=self._q, initial_xt=self._xt,
    #                             initial_xk = self._xk, dt = self.dt, p=self._p,
    #     cont_state_names = copy.copy(self.cont_state_names), discr_state_names = copy.copy(self.discrete_state_names),
    #                             discr_output_names=copy.copy(self.discrete_output_names),
    #     cont_output_names=copy.copy(self.cont_output_names), unknown_state = copy.copy(self.UNKNOWN_STATE))
    #     return new_copy

    def copy(self):
        new_copy = copy.deepcopy(self)
        return new_copy

    @property
    def parent_system(self):
        return self._parent_system

    @property
    def full_id(self):
        full_id = self.id
        s = self.parent_system
        while s.parent_system:
            full_id = s.id + "." + full_id
            s = s.parent_system
        return full_id

    @property
    def overall_system(self):
        s = self
        while s.parent_system:
            s = s.parent_system
        return s

    @property
    def state(self):
        """
        System state is a vector of all state variables.
        :return:
        """
        return self._q, self._xt, self._xk

    @state.setter
    def state(self, value):
        self._q = value[0]
        self._xt = value[1]
        self._xk = value[2]

    @property
    def discrete_state(self):
        return self._q

    @discrete_state.setter
    def discrete_state(self, value):
        self._q = value

    def is_decision(self, state, overall_state):
        return False

    def __str__(self):
        return '{}: {}'.format(self.id, self._q)

    def _step_continuous(self, clock_start, clock_finish):
        return self.__step_continuous(clock_start, clock_finish)

    def simulation_process_simpy(self, env, max_time, verbose=False):
        """
            The function simulates single concurrent thread of one component (CPSComponent).
            Parameters
            ----------
            env : simpy.Environment
                It is simpy environment used to synchronize multiple parallel threads during simulation.
            max_time : float
                It is the maximum time that the simulation will perform.
            verbose : bool
                Should the function print execution logs (default is False).
        """
        env.process(self.discrete_dynamics_process(env, max_time, verbose=verbose))
        env.process(self.continuous_dynamics_process(env, max_time, verbose=verbose))
        env.process(self.discrete_time_dynamics_process(env, max_time, verbose=verbose))

    def discrete_event_dynamics(self, q, xt, xk, p) -> tuple:
        pass

    def discrete_dynamics_process(self, env, max_time, verbose=False):
        # self._env = env
        self._u = self.input(self._q, 0)
        if self._p is None:
            self._p = self.context(self._q, None, None, 0)
        self._past_t = [0]
        self._past_p = [self._p]


        self._discrete_state_data = []
        self._discrete_output_data = []

        while True:
            discr_state = dict(zip(self.discrete_state_names, self._q))
            if self.output_d is None:
                self._d = self.discrete_state
            else:
                self._d = self.output_d(self.discrete_state, self._xt, self._xk)
            self._discrete_state_data.append(dict(Time=env.now, **discr_state, **self._p))
            self._discrete_output_data.append([env.now, *self._d])

            # Stop the simulation if the max time is reached
            if env.now > max_time:
                if verbose:
                    print(f'Stop because of maximum time {max_time} is reached.')
                break
            # Stop if the stopping condition of the overall system is met
            elif self.overall_system.finish_condition():
                if verbose:
                    print('Stop because stop condition of the overall system is met.')
                break

            # DETERMINE THE NEXT EVENT
            # 1. CHECK IF TIMED EVENT
            try:
                event_delay, new_state_value = self.timed_event(*self.state)
            except Exception as ex:
                print(f"{self.id}: Exception during self.timed_transition in state {self._q}")
                raise ex

            # 2. IF NOT TIMED THEN WAIT
            if event_delay is None:
                if verbose:
                    print('{}: Waiting in: {}'.format(self.id, self._q))
                if self._block_event is None:
                    self._block_event = env.event()
                    yield self._block_event
                else:
                    raise Exception("{}: State: {}. Exception during block event.".format(self.id, self._q))
                event = self._block_event.value
                self._block_event = None

                self._p['event'] = event
                new_q, xt, xk = self.discrete_event_dynamics(self._q, self._xt, self._xk, self._p)
                if verbose:
                    print(f'{self._q}->{event}->{new_q}')
                if xt is not None:
                    self._xt = xt
                if xk is not None:
                    self._xk = xk

                self._q = new_q
            else:
                if verbose:
                    print('Waiting in: {} for {}. New state: {}'.format(self._q, event_delay, new_state_value))
                yield env.timeout(event_delay)
                self._q = (new_state_value,)

            try:
                self.on_entry(self._q, self._p)
            except Exception as ex:
                print(f"{self.id}: Exception during discrete event dynamics '{self._q}'->")
                raise ex

    def discrete_time_dynamics_process(self, env, max_time, verbose=False):
        while (self._xk is not None and len(self._xk) > 0) or self.output_y is not None:
            self._continuous_state_data.append((self._t, *self._xk))
            self._continuous_output_data.append((self._t, *self.output_y(*self.state)))

            # Stop the simulation if the max time is reached
            if env.now > max_time:
                if verbose:
                    print(f'Stop because of maximum time {max_time} is reached.')
                break
            # Stop if the stopping condition of the overall system is met
            elif self.overall_system.finish_condition():
                if verbose:
                    print('Stop because stop condition of the overall system is met.')
                break

            yield env.timeout(self.dt)
            xk = self.time_discrete_dynamics(self._q[0], self._p, self._xk, self._u)
            self._xk = xk
            self._t = env.now
            grds = self.guards(self._q[0], self._xk)
            if grds is not None:
                self.apply_sim_event(grds, env)


    def guards(self, q, x):
        pass

    def continuous_dynamics_process(self, env, max_time, verbose=False):
        # try:
        #     if len(self._xt) > 0 or len(self._xk) > 0:  # there is time-continuous state variable
        #         self.__step_continuous(*self.state)
        # except Exception as ex:
        #     print_exc()
        #     raise Exception('Exception during self.__step_continuous in state {}'.format(self._q))

        # EXECUTE THE NEXT EVENT
        if False:
            yield
        # if verbose:
        #     print('--------------------')
        #
        # self._past_t.append(env.now)
        # self._past_p.append(self._p)
        #
        # self._discrete_state_data[-1]['Finish'] = env.now
        # discr_state = dict(zip(self.discrete_state_names, self._q))
        # self._discrete_state_data.append(
        #     dict(Time=env.now, Finish=None, **discr_state, **self._p))
        # self._discrete_output_data.append([env.now, *self._d])
        #
        # if verbose:
        #     print('{}: Time: {} State: {}'.format(self.id, env.now, self._q))
        #
        # if self._xt is not None:
        #     self._continuous_state_data.append([env.now, *self._xt, *self._xk])
        # if self._y is not None:
        #     self._continuous_output_data.append([env.now, *self._y])
        # if self._xt is not None:
        #     self._xt = self.__step_continuous(env.now - clock_start)
        # self._d, self._y = self.output(self._q, 0, self._xt, self._xk, self._u, self._p)
        #
        # print('--------------------')
        # print('{}: Simulation finished'.format(self.id))

    def simulate(self, finish_time, verbose=False, reinitialize=True, save_choices=False):
        """Simulates behavior of the system until the finish_time is reached.

        Parameters
        ----------
        finish_time : Time when simulation finishes.
        verbose : Should the execution log be printed in detail (default is False).
        save_choices : Should the choices for each component be saved to json files after
        the simulation is finished (default is False).
        """
        self.reinitialize(0)

        env = simpy.Environment()
        finish_time = float(finish_time)

        if verbose:
            print('Simulation started: ')

        self.simulation_process_simpy(env, finish_time, verbose)
        env.run(until=finish_time)

        discr_state_data = {}
        discr_output_data = {}
        cont_state_data = {}
        cont_output_data = {}

        try:
            if len(self._xt) > 0 and len(self._xt) > 0 and finish_time > self._t:  # there is time-continuous state variable
                self._step_continuous(self._t, finish_time)
        except Exception as ex:
            print_exc()
            raise Exception('Exception during self._step_continuous in state {}'.format(self._q))

        try:
            self.finish(env.now)

            discr_state_data = pd.DataFrame(self._discrete_state_data).set_index("Time")
            discr_output_data = tools.data_list_to_dataframe(None, self._discrete_output_data,
                                                                   self.discrete_output_names, 'd')
            try:
                cont_state_data = tools.data_list_to_dataframe(self.id, self._continuous_state_data,
                                                               self.cont_state_names, 'x')
            except Exception as ex:
                print_exc()
            cont_output_data = tools.data_list_to_dataframe(self.id, self._continuous_output_data,
                                                                     self.cont_output_names, 'y')
        except Exception as ex:
            print_exc()
            warnings.warn('Simulation failed.')

        if verbose:
            print('Simulation finished.')
        return discr_state_data, discr_output_data, cont_state_data, cont_output_data, env.now

    def get_sim_state(self):
        return self._q, self._p, self._y, self._block_event

    def set_sim_state(self, q, e, p, y, block_event):
        self._q = q
        self._p = p
        self._y = y
        self._block_event = block_event

    def finish_condition(self):
        pass

    def reinitialize(self, t, state=None):
        if state is not None:
            self.state = state

        self._p = self.context(self._q, None, None, 0)
        self._u = self.input(self._q, 0)
        self._past_t = [t]
        self._past_p = [self._p]
        # self.output(self._q, 0, self._xt, self._xk, self._u, self._p)
        self._continuous_state_data = []
        self._continuous_output_data = []
        try:
            discr_state = dict(zip(self.discrete_state_names, self._q))
        except:
            raise Exception('Exception during discrete state initialization')

        self._discrete_state_data = [dict(Time=t, **discr_state, **self._p)]
        self._discrete_output_data = []
        self._block_event = None
        self._pending_events = []
        self._t = t

    def get_execution_data(self):
        try:
            data = pd.DataFrame(self._discrete_state_data) #, columns=['Time', 'Finish', 'State', 'Event'] + list(self._p.keys()))
        except ValueError as ve:
            warnings.warn(str(self._discrete_state_data))
            warnings.warn('Discrete state data not available due to value error.')
            for i, row in enumerate(self._discrete_state_data):
                for k, v in row.items():
                    if not is_scalar(v):
                        print(f"Non-scalar at row {i}, key '{k}'")
                        print("Type:", type(v))
                        print("Length:", len(v) if hasattr(v, "__len__") else "N/A")
                        break
            raise ve

        data['Finish'] = data['Time'].shift(-1)
        data['Duration'] = pd.to_timedelta(data['Finish'] - data['Time']).dt.total_seconds()
        return data

    # def clear_execution_data(self):
    #     self.discrete_state_data = None
    #     data = pd.DataFrame(self._discrete_state_data) #, columns=['Time', 'Finish', 'State', 'Event'] + list(self._p.keys()))
    #     data['Finish'] = data['Time'].shift(-1)
    #     data['Duration'] = pd.to_timedelta(data['Finish'] - data['Time']).dt.total_seconds()
    #     return data

    def update_input(self, u):  # Set i if you want from outside
        self._u = u

    def output_d(self, q, xt, xk):
        return ()

    def output_y(self, q, xt, xk):
        return ()

    def input(self, q, clock) -> tuple:
        pass

    def context(self, q, past_t, past_p, t) -> tuple:
        return dict()

    def time_continuous_dynamics(self, t, xt, xk, q, u, p, y):
        pass

    def time_discrete_dynamics(self, q, p, x, u):
        pass

    def on_entry(self, q, context):
        pass

    def __step_continuous(self, clock_start, clock_finish):
        """
    Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
        :param x0: Initial state vector.
        :param t: Time at start of the step simulation.
        :param u: Arguments passed.....
        :return: Time t+dt, value of state at t+dt
        """

        s = solve_ivp(self.time_continuous_dynamics, t_eval=np.arange(clock_start + self.dt, clock_finish + self.dt, self.dt),
                      t_span=(clock_start, clock_finish + self.dt), y0=self._xt, method='RK23',
                      args=(self._xk, self._q, self._u, self._p, self._y))

        self._xk = self.time_discrete_dynamics(self._q, self._p, self._xt, self._u)
        self._xt = s.y[-1]

        # self._continuous_state_data += np.concatenate([s.t.T[None], s.y]).T.tolist()

    def invariants(self, q, clock, xc, xd, y):
        pass

    def timed_event(self, q, xc, xd):
        """
        Calculates if and when the next time event will happen and the new state values.
        :param t: Current time.
        :param q: Discrete-event part of the state.
        :param xc: Time-continuous part of the state.
        :param xd: Time-discrete part of the state.
        :return: Time delay of the timed event, new state value.
        """
        return None, None

    def wait(self, q):
        pass

    def apply_sim_event(self, e, env=None):
        """
        The event e is applied in this component's simpy execution, this means that the process must wait for an event.
        :param e: Event to apply.
        :return:
        """
        if not self._block_event:
            # self._block_event = self._env.now
            self._block_event = env.now
        # if not self._block_event:
        #     raise Exception("Block event is not set, but event is applied.")
        elif self._block_event.triggered:
            # pass
            # if self._e != e:
            raise Exception('Should not happen')
        else:
            self._block_event.succeed(value=e)

    def finish(self, t, **kwargs):
        if self._p is None:
            self._p = kwargs
        else:
            self._p.update(kwargs)
        self._t = t
        # self._e = None
        # self._q = None
        self._past_p.append(self._p)
        # self._discrete_state_data[-1]['Finish'] = t

    def get_alternatives(self, state, system_state):
        return None

    def get_decision_state(self, state, overall_state):
        return state

state property writable

System state is a vector of all state variables. :return:

__step_continuous(clock_start, clock_finish)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

Source code in ml4cps/cps/base.py
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
def __step_continuous(self, clock_start, clock_finish):
    """
Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'.
    :param x0: Initial state vector.
    :param t: Time at start of the step simulation.
    :param u: Arguments passed.....
    :return: Time t+dt, value of state at t+dt
    """

    s = solve_ivp(self.time_continuous_dynamics, t_eval=np.arange(clock_start + self.dt, clock_finish + self.dt, self.dt),
                  t_span=(clock_start, clock_finish + self.dt), y0=self._xt, method='RK23',
                  args=(self._xk, self._q, self._u, self._p, self._y))

    self._xk = self.time_discrete_dynamics(self._q, self._p, self._xt, self._u)
    self._xt = s.y[-1]

apply_sim_event(e, env=None)

The event e is applied in this component's simpy execution, this means that the process must wait for an event. :param e: Event to apply. :return:

Source code in ml4cps/cps/base.py
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
def apply_sim_event(self, e, env=None):
    """
    The event e is applied in this component's simpy execution, this means that the process must wait for an event.
    :param e: Event to apply.
    :return:
    """
    if not self._block_event:
        # self._block_event = self._env.now
        self._block_event = env.now
    # if not self._block_event:
    #     raise Exception("Block event is not set, but event is applied.")
    elif self._block_event.triggered:
        # pass
        # if self._e != e:
        raise Exception('Should not happen')
    else:
        self._block_event.succeed(value=e)

simulate(finish_time, verbose=False, reinitialize=True, save_choices=False)

Simulates behavior of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

Source code in ml4cps/cps/base.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
def simulate(self, finish_time, verbose=False, reinitialize=True, save_choices=False):
    """Simulates behavior of the system until the finish_time is reached.

    Parameters
    ----------
    finish_time : Time when simulation finishes.
    verbose : Should the execution log be printed in detail (default is False).
    save_choices : Should the choices for each component be saved to json files after
    the simulation is finished (default is False).
    """
    self.reinitialize(0)

    env = simpy.Environment()
    finish_time = float(finish_time)

    if verbose:
        print('Simulation started: ')

    self.simulation_process_simpy(env, finish_time, verbose)
    env.run(until=finish_time)

    discr_state_data = {}
    discr_output_data = {}
    cont_state_data = {}
    cont_output_data = {}

    try:
        if len(self._xt) > 0 and len(self._xt) > 0 and finish_time > self._t:  # there is time-continuous state variable
            self._step_continuous(self._t, finish_time)
    except Exception as ex:
        print_exc()
        raise Exception('Exception during self._step_continuous in state {}'.format(self._q))

    try:
        self.finish(env.now)

        discr_state_data = pd.DataFrame(self._discrete_state_data).set_index("Time")
        discr_output_data = tools.data_list_to_dataframe(None, self._discrete_output_data,
                                                               self.discrete_output_names, 'd')
        try:
            cont_state_data = tools.data_list_to_dataframe(self.id, self._continuous_state_data,
                                                           self.cont_state_names, 'x')
        except Exception as ex:
            print_exc()
        cont_output_data = tools.data_list_to_dataframe(self.id, self._continuous_output_data,
                                                                 self.cont_output_names, 'y')
    except Exception as ex:
        print_exc()
        warnings.warn('Simulation failed.')

    if verbose:
        print('Simulation finished.')
    return discr_state_data, discr_output_data, cont_state_data, cont_output_data, env.now

simulation_process_simpy(env, max_time, verbose=False)

The function simulates single concurrent thread of one component (CPSComponent). Parameters


env : simpy.Environment It is simpy environment used to synchronize multiple parallel threads during simulation. max_time : float It is the maximum time that the simulation will perform. verbose : bool Should the function print execution logs (default is False).

Source code in ml4cps/cps/base.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
def simulation_process_simpy(self, env, max_time, verbose=False):
    """
        The function simulates single concurrent thread of one component (CPSComponent).
        Parameters
        ----------
        env : simpy.Environment
            It is simpy environment used to synchronize multiple parallel threads during simulation.
        max_time : float
            It is the maximum time that the simulation will perform.
        verbose : bool
            Should the function print execution logs (default is False).
    """
    env.process(self.discrete_dynamics_process(env, max_time, verbose=verbose))
    env.process(self.continuous_dynamics_process(env, max_time, verbose=verbose))
    env.process(self.discrete_time_dynamics_process(env, max_time, verbose=verbose))

timed_event(q, xc, xd)

Calculates if and when the next time event will happen and the new state values. :param t: Current time. :param q: Discrete-event part of the state. :param xc: Time-continuous part of the state. :param xd: Time-discrete part of the state. :return: Time delay of the timed event, new state value.

Source code in ml4cps/cps/base.py
703
704
705
706
707
708
709
710
711
712
def timed_event(self, q, xc, xd):
    """
    Calculates if and when the next time event will happen and the new state values.
    :param t: Current time.
    :param q: Discrete-event part of the state.
    :param xc: Time-continuous part of the state.
    :param xd: Time-discrete part of the state.
    :return: Time delay of the timed event, new state value.
    """
    return None, None

automata

ml4cps.automata.learn

The module provides learning algorithms for creation of different kinds of automata.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

FnDetectChangePoints(xout, udout, xout_shifts)

Detects change points in output and input variables, filters them, and constructs a trace structure. This function analyzes the provided output (xout) and input (udout) data to detect change points using the findChangePoints function. It processes both output and input variables, aggregates and filters the detected change points, and returns a structured trace dictionary containing the processed data and change point information. Args: xout (np.ndarray): Output data array of shape (n_samples, n_outputs). udout (np.ndarray): Input data array of shape (n_samples, n_inputs). xout_shifts (np.ndarray): Shifted output data array, used for further processing. Returns: dict: A dictionary with the following keys: - 'x': Filtered output data array. - 'xs': Filtered shifted output data array. - 'chpoints': Array of global change points detected across all variables. - 'chpoints_per_var': List of arrays, each containing change points for a specific variable. - 'ud': Filtered input data array. - 'labels_num': Empty list (reserved for numeric labels). - 'labels_trace': Empty list (reserved for trace labels). Notes: - Relies on global variables: num_var, num_ud, max_deriv, and chp_depths. - Uses external functions: findChangePoints and filterChangePoints. - The function is intended for use in time-series or sequential data analysis where detecting significant changes in variable values is required.

FnShiftAndDiff(xout, udout, norm_coeff, num_var, num_ud, max_deriv, Ts)

Normalizes and processes state and input data by generating shifted versions and computing derivatives. This function performs the following operations: 1. Normalizes the state output xout using the provided normalization coefficients. 2. Generates shifted duplicates of xout up to the specified maximum derivative order. 3. Computes numerical derivatives of xout up to max_deriv order and appends them to the state data. 4. Strips the initial entries from xout and the shifted data to account for the derivative computation. 5. Normalizes the input data udout (if present) and strips initial entries to match the processed state data. Args: xout (np.ndarray): State output data of shape (n_samples, num_var). udout (np.ndarray): Input data of shape (n_samples, num_ud). norm_coeff (np.ndarray): Normalization coefficients of shape (num_var + num_ud, 1). num_var (int): Number of state variables. num_ud (int): Number of input variables. max_deriv (int): Maximum order of derivatives to compute. Ts (float): Sampling time interval. Returns: Tuple[np.ndarray, np.ndarray, pd.DataFrame]: - xout (np.ndarray): Processed and augmented state data with derivatives, shape (n_samples - max_deriv, ...). - udout (np.ndarray): Normalized input data, shape (n_samples - max_deriv, num_ud). - xout_shifts (pd.DataFrame): Shifted duplicates of the normalized state data, shape (n_samples - max_deriv, ...). Notes: - The function assumes that xout and udout are NumPy arrays and that xout has at least max_deriv rows. - The normalization coefficients should be provided for both state and input variables. - The function uses zero-padding for shifted and derivative computations.

FnTraceToTrainingData(trace, num_var, num_ud, useTime)

Converts a trace dictionary into training data suitable for machine learning models. Parameters: trace (dict): A dictionary containing the following keys: - 'x' (np.ndarray): Array of system variables over time (shape: [timesteps, num_var]). - 'ud' (np.ndarray): Array of user-defined variables over time (shape: [timesteps, num_ud]). - 'labels_trace' (np.ndarray): Array of state labels for each segment. - 'chpoints' (list or np.ndarray): Indices indicating change points (state switches) in the trace. num_var (int): Number of system variables to include in the feature vector. num_ud (int): Number of user-defined variables to include in the feature vector. useTime (bool): If True, includes the time since the last state switch as a feature. Returns: X (np.ndarray): Feature matrix where each row corresponds to a time step and contains: - Current state label - System variables - User-defined variables (if num_ud > 0) - Time since last state switch (if useTime is True) Y (np.ndarray): Array of next state labels (class labels) for each feature vector in X. states (np.ndarray): Array of state labels for each time step in the trace. Notes: - The function skips the last time step in the trace for feature construction, as it cannot form a (X, Y) pair. - The function assumes that the trace data is properly aligned and that 'chpoints' and 'labels_trace' are consistent.

build_pta(data, event_col='event', boundaries=1)

Builds a Prefix Tree Acceptor (PTA) from a collection of event sequences. This function constructs a PTA by iterating through each sequence of events in the provided data. It adds states and transitions to the automaton based on the observed event sequences, and sets the depth, in-degree, and out-degree of the states. The PTA is useful for learning automata from positive examples. Args: data (iterable): An iterable of event sequences. Each sequence can be a pandas DataFrame, pandas Series, or string. If a DataFrame, it should contain at least a time column and an event column. event_col (str, optional): The name of the column containing event labels in the input DataFrame or Series. Defaults to 'event'. boundaries (int or dict, optional): Not currently used in the function, but intended for handling event boundaries or timing constraints. Defaults to 1. Returns: Automaton: The constructed Prefix Tree Acceptor representing the input event sequences. Notes: - The function expects the presence of an Automaton class with methods for adding states, transitions, and final states. - If a sequence is a string, it is converted to a pandas Series of characters. - Timing information (dt) is calculated as the difference between consecutive time steps. - The function skips empty sequences.

computeDistance(der)

Computes a distance metric over a sliding window for a given derivative array. For each position in the input array der, the function calculates the sum of absolute differences between two windows of size windowSize before and after the current position, after normalizing each window by subtracting its first element. The result is an array of distances. Parameters: der (np.ndarray): The input array (e.g., derivative values) over which to compute the distance. Returns: np.ndarray: An array containing the computed distances for each valid position.

filterChangePoints(xout, udout, xout_shifts, chpoints, chp_var)

Filters and synchronizes detected changepoints in time series data across multiple variables. This function processes global and local changepoint indices to ensure consistency and remove redundant or closely spaced changepoints. It updates the provided data arrays and changepoint lists accordingly. Args: xout (np.ndarray): Output variable time series data (samples x variables). udout (np.ndarray): Input variable time series data (samples x variables), or an empty array if not used. xout_shifts (np.ndarray): Shifted output variable data (samples x variables). chpoints (list or np.ndarray): List of global changepoint indices. chp_var (list of np.ndarray): List containing arrays of changepoint indices for each variable. Returns: tuple: - xout (np.ndarray): Filtered output variable data. - udout (np.ndarray): Filtered input variable data. - xout_shifts (np.ndarray): Filtered shifted output variable data. - chpoints (np.ndarray): Filtered and synchronized global changepoint indices. - chp_var (list of np.ndarray): Updated list of changepoint indices for each variable. Notes: - Uses global variables: windowSize, num_var, num_ud. - Assumes that changepoints are sorted and that there are at least two changepoints. - The function modifies chp_var in place.

filterindx(indx, windw)

Filters out indices from the input array that are within a specified window of each other. Given a sorted array of indices, this function removes any index that is within windw distance from its predecessor, keeping only the first occurrence in each window. Parameters: indx (array-like): A sorted array or list of integer indices. windw (int): The minimum allowed distance between consecutive indices. Returns: numpy.ndarray: The filtered array of indices, where no two indices are within windw of each other. Example: >>> filterindx(np.array([1, 2, 3, 10, 12]), 2) array([ 1, 10, 12])

findChangePoints(xout, depth, starting, ending, max_depth)

Recursively detects change points in a multi-dimensional signal using a hierarchical approach. This function analyzes a segment of the input array xout at a given depth (dimension), computes a distance metric to identify potential change points (peaks), and then recursively searches for further change points in subsegments at deeper levels. The recursion stops when the maximum depth is reached or the segment is too small. Parameters


xout : np.ndarray The input array containing the signal or features to analyze. Expected shape is (n_samples, n_features). depth : int The current depth (dimension) being analyzed. starting : int The starting index of the segment to analyze. ending : int The ending index (exclusive) of the segment to analyze. max_depth : int The maximum depth (dimension) to analyze. Returns


np.ndarray An array of indices representing detected change points within the specified segment. Notes


  • Uses global variables windowSize and chp_depths for windowing and tracking change points per depth.
  • Utilizes computeDistance, find_peaks, and filterindx helper functions.
  • At the top level (depth == 0), prepends and appends boundary indices to the result.

rpni(positive_samples, negative_samples)

Implements the RPNI (Regular Positive and Negative Inference) algorithm for learning a DFA from positive and negative samples. Args: positive_samples (Iterable[str]): A collection of strings that should be accepted by the learned DFA. negative_samples (Iterable[str]): A collection of strings that should be rejected by the learned DFA. Returns: DFA: A deterministic finite automaton (DFA) that accepts all positive samples and rejects all negative samples. Notes: - The function first constructs a Prefix Tree Acceptor (PTA) from the positive samples. - It then attempts to merge states in the DFA, ensuring that no negative sample is accepted after each merge. - The merging process is guided by the constraint that all negative samples must be rejected.

simple_learn_from_event_logs(data, initial=True, count_repetition=True, verbose=False)

Simple algorithm to learn a timed automaton from event log data. This function constructs a timed automaton by iterating over sequences of timestamped events. Each event sequence is treated as a trace, and transitions are created between states based on event occurrences and their timing. States are determined by the emitted events, optionally including repetition counts. The automaton can be initialized with an explicit initial state, and transitions can account for repeated events. Args: data (list or pandas.Series): A list of event sequences, where each sequence is a pandas Series with timestamps as indices and event labels as values. initial (bool, optional): If True, adds an explicit 'initial' state to the automaton. Defaults to True. count_repetition (bool, optional): If True, distinguishes states and transitions by counting consecutive repetitions of the same event. Defaults to True. verbose (bool, optional): If True, prints detailed information about the learning process. Defaults to False. Returns: Automaton: The learned timed automaton object. Notes: - Each sequence in data should be a pandas Series indexed by timestamps. - If a sequence contains fewer than two events, it is skipped. - The function assumes the existence of an Automaton class with add_initial_state and add_single_transition methods.

simple_learn_from_signal_updates(data, sig_names, initial=True, verbose=False)

Learns a timed automaton from sequences of signal updates. This function processes a list of dataframes, each representing a sequence of signal updates over time. For each sequence, it constructs states based on the values of the specified signals and adds transitions to an Automaton object whenever a signal value changes. Args: data (list of pandas.DataFrame): List of dataframes, each containing time-stamped signal updates. The first column is assumed to be the time column, and subsequent columns correspond to signal names. sig_names (list of str): List of signal names to track and use for state construction. initial (bool, optional): If True, adds an initial state to the automaton for each sequence. Defaults to True. verbose (bool, optional): If True, prints detailed information about the learning process. Defaults to False. Returns: Automaton: The learned automaton with states and transitions based on the observed signal updates. Notes: - Each state is represented as a dictionary mapping signal names to their current values. - Transitions are added only when all signal values are set (i.e., not None). - The event label for each transition is formatted as '<-'. - The transition is annotated with the time difference (delta_t) between consecutive events.

simple_learn_from_signal_vectors(data, drop_no_changes=True, verbose=False)

Learns a timed automaton from a list of signal vector dataframes. This function processes sequences of signal vectors (as pandas DataFrames), detects changes in the specified signal columns, and constructs a timed automaton by adding transitions for each detected event. Args: data (list of pandas.DataFrame): List of DataFrames, each representing a sequence of signal vectors. The first column is assumed to be the time column, and the remaining columns are signal values. sig_names (list of str): List of column names in the DataFrame that correspond to the signals to be considered for state transitions. drop_no_changes (bool, optional): If True, rows where no signal changes occur are dropped before processing. Default is False. verbose (bool, optional): If True, prints detailed information about the learning process. Default is False. Returns: Automaton: An Automaton object constructed from the observed transitions in the input data. Notes: - Each transition in the automaton corresponds to a change in the signal vector, with the event label representing the difference between consecutive signal vectors and the transition time as the time delta. - The function assumes that the Automaton class and its add_single_transition method are defined elsewhere.

ml4cps.automata.base

The module provides a class Automaton which inherits CPSComponent and implements the dynamics of different kinds of automata.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

Automaton

Bases: CPSComponent

Automaton class is the main class for modeling various kinds of hybrid systems.

num_modes property

Returns the number of modes in the automaton. :return: number of states.

num_transitions property

Returns the number of transitions in the automaton. :return: number of transitions.

state property writable

Automata discrete state is uni-variate. :return:

__init__(states=None, transitions=None, unknown_state='raise', id='', initial_q=(), initial_r=None, final_q=(), super_states=(), decision_states=(), **kwargs)

Class initialization from lists of elements. :param states: Discrete states / modes of continuous behavior. :param events: The events that trigger state transitions. :param transitions: The transition information. If a collection of dicts then dict should contain "source", "dest" and "event". The other attributes will be added as data of that transition. Alternatively, a collection of tuples can be used of the form (source, event, dest, *). :param unknown_state: The name of unknown states during "play in", if "raise", an exception will be raised.

__str__()

String representation of the automaton. :return:

add_final_state(states)

Add final state(s) of the automaton. :param states: States to add.

add_initial_state(states)

Add initial state(s) of the automaton. :param states: States to add.

add_state(new_state, **kwargs)

Add state to the automaton. :param new_state: State to be added.

add_state_data(s, d)

Add state data to a state s the automaton. :param s: state :param d: data to be added to s :return:

add_states_from(new_state, **kwargs)

Add multiple states to the automaton. :param new_state: States to be added.

add_transition(s, d, e, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

add_transitions_from(list_of_tuples, **other)

Add multiple transition. :param list_of_tuples: List of transitions in the form (source_state, destination_state, event, ......).

flow(q, p, x, u)

Flow equation gives derivative of the continuous variables. :param q: Current discrete state of the model. :param p: Stochastic parameters generated on entry to state current_q. :param x: Current continuous state of the model. :param u: Calculated internal i signals. :return: Derivative of the continuous state x.

get_num_in(q)

Returns the number of in transitions of state q in the automaton. :return: number of transitions.

get_num_out(q)

Returns the number of out transitions of state q in the automaton. :return: number of transitions.

get_transition(s, d=None, e=None, if_more_than_one='raise')

Get all transitions with source state s, destination state __d. In case when e is provided, the returned list contains transitions where event is e. :param if_more_than_one: :param s: Source state. :param d: Destination state. :param e: Event. :return:

in_transitions(s, event=None)

Get all incoming transitions of state s. :param s: :return:

inv(t, q, x, y, z, p)

Invariants. :param t: :param q: :param x: :param y: :param z: :param p:

is_transition(s, d, e)

Check if a transition (s,d,e) exists in the automaton. :param s: Source. :param d: Destination. :param e: Event. :return:

merge(q1, q2)

If two states are compatible, they are merged with the function merge. The transitions of the automaton, the in- and outdegree of the states and the number of transitions happening are adjusted.

out_transitions(s, event=None)

Get all outgoing transitions of state s. :param event: :param s: :return:

print_state(v)

Prints outgoing transitions of a state v.

Parameters:

Name Type Description Default
v state
required

Returns:

Name Type Description
String

Description of the outgoing transitions of the state.

remove_transition(source, dest)

Remove the transition(s) from source to dest. :param source: :param dest: :return:

rename_events(prefix='e_')

Rename events to become e_0, e_1... The old id is stored in the field 'old_symbol' of the state data.

step(q, x0, t, u)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

try_merge_states(state1, state2, try_fun=None)

Merge state2 into state1 and update transitions.

cps

ml4cps.cps.base

The module provides base classes to represent the dynamics of cyber-physical systems (CPS): - CPS, and, - CPSComponent.

Author: Nemanja Hranisavljevic, hranisan@hsu-hh.de

CPS

CPS class represents a cyber-physical system. It is used to model the hierarchy in the system as each CPS can contain a mixture of other CPS and CPS components. The leaves of the hierarchy are only CPS component objects which define their dynamics and communication with other components.

Attributes:

Name Type Description
id str

Identifier of the object.

com OrderedDict

Property. A collection of components.

parent_system property writable

Gets the parent system by accessing _parent_system private attribute.

Returns:

Type Description
CPS

The parent system.

__getitem__(key)

Gets the component with the given key in the collection.

Parameters:

Name Type Description Default
key string

The key where the value will be stored. Must be a hashable type (e.g., int, str).

required

Return: (CPS, CPSComponent): Returned component or subsystem.

__init__(sys_id, components)

Initializes CPS with the given attributes.

Parameters:

Name Type Description Default
sys_id str

ID of the system.

required
components iterable

Child components of the system.

required

get_all_components(exclude=None)

Returns a list of all child components (not only direct) except those with ids possibly provided in exclude.

Parameters

exclude : iterable A list of component ids to exclude in the returned list.

get_components(exclude=None)

Returns a list of all direct child components except those with ids possibly provided in exclude.

Parameters

exclude : iterable A list of component ids to exclude in the returned list (default is None).

reinitialize(t=0, state=None)

The function re-initializes the CPS components of this CPS with the given state values. :param t: Current time to set to the components. :param state: State of the CPS (it's components) given as a dictionary of dictionaries ... of tuples (according to the CPS hierarchy). The tuplus are created as values concatenated values of discrete-event state variables, time-continuous state variables and time-discrete state variables.

:return:

set_child_component(id, com)

Set component with the id.

Parameters

id : str ID of the component to add. com : (CPS, CPSComponent) Component of subsystem to add.

simulate(finish_time, verbose=False, save_choices=False)

Simulates behaviour of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

CPSComponent

Bases: PythonModel, Simulator

General hybrid system class based on scipy and simpy.

state property writable

System state is a vector of all state variables. :return:

__step_continuous(clock_start, clock_finish)

Simulates one time step of continuous behavior from t to t+dt. Underlying function is solve_ivp with method is 'RK23'. :param x0: Initial state vector. :param t: Time at start of the step simulation. :param u: Arguments passed..... :return: Time t+dt, value of state at t+dt

apply_sim_event(e, env=None)

The event e is applied in this component's simpy execution, this means that the process must wait for an event. :param e: Event to apply. :return:

simulate(finish_time, verbose=False, reinitialize=True, save_choices=False)

Simulates behavior of the system until the finish_time is reached.

Parameters

finish_time : Time when simulation finishes. verbose : Should the execution log be printed in detail (default is False). save_choices : Should the choices for each component be saved to json files after the simulation is finished (default is False).

simulation_process_simpy(env, max_time, verbose=False)

The function simulates single concurrent thread of one component (CPSComponent). Parameters


env : simpy.Environment It is simpy environment used to synchronize multiple parallel threads during simulation. max_time : float It is the maximum time that the simulation will perform. verbose : bool Should the function print execution logs (default is False).

timed_event(q, xc, xd)

Calculates if and when the next time event will happen and the new state values. :param t: Current time. :param q: Discrete-event part of the state. :param xc: Time-continuous part of the state. :param xd: Time-discrete part of the state. :return: Time delay of the timed event, new state value.

ml4cps.cps.sim

Simulation.

Author: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com

Simulator

stop_condition(t)

Simulation stop condition which can be overridden by the subclasses.

discretization

ml4cps.discretization.discretization

The module provides base classes to represent the dynamics of cyber-physical systems (CPS): CPS and CPSComponent.

Author: Nemanja Hranisavljevic, hranisan@hsu-hh.de

EqualFrequencyDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements the equal-frequency interval (EFI) discretization method. It divides each variable (column) into intervals such that each interval contains approximately the same number of data points.

discretize(df, return_str=False, append_discr=None)

Discretize data into equal-frequency intervals. :param df: Data to discretize. :param return_str: Whether to return discretized data as a concatenated string per row. :return: Discretized data.

train(data, number_of_intervals=10)

Estimate model parameters, thresholds that divide each variable into equal-frequency intervals. :param data: Data to calculate model parameters from. :param number_of_intervals: Number of equal-frequency intervals per variable. :return:

EqualWidthDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements equal-width interval (EWI) discretization method. It calculates range of every variable (column) of the input data into a predefined number of equal-width intervals.

discretize(df, return_str=False, append_discr=None)

Discretize data into equal width intervals. :param data: Data to discretize. :return: Discretized data.

train(data, number_of_intervals=10)

Estimate model parameters, thresholds that divide each variable into equal-width intervals. :param data: Data to calculate model parameters from. :param number_of_intervals: Number of equal-width intervals per variable. :return:

KMeansDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements K-means discretization. It clusters the data values of each variable into a predefined number of clusters using the K-means algorithm.

discretize(df, return_str=False, append_discr=None)

Discretize data into clusters determined by K-means. :param df: DataFrame to discretize. :param return_str: Whether to return discretized data as concatenated strings. :return: Discretized data.

train(data, number_of_clusters_per_var=10)

Train the K-means discretizer by fitting K-means models for each variable (column) in the data. :param data: List of DataFrames to calculate model parameters from. :param number_of_clusters_per_var: Number of clusters (intervals) per variable.

MultivariateKMeansDiscretizer

Bases: TimeSeriesDiscretizer

A class that implements multivariate K-means discretization. It clusters the data based on all variables (columns) together into a predefined number of clusters.

discretize(df, return_str=False, append_discr=None)

Discretize data into clusters determined by K-means. :param df: DataFrame to discretize. :param return_str: Whether to return discretized data as concatenated strings. :return: Discretized data (cluster labels).

train(data, number_of_clusters=10)

Train the K-means discretizer by fitting a single K-means model for all variables. :param data: List of DataFrames to calculate model parameters from. :param number_of_clusters: Number of clusters.

TimeSeriesDiscretizer

Abstract class that encapsulates methods used for the discretization of time series.

vis

ml4cps.vis

The module provides methods to visualize various kinds of data, such as time series or automata graphs.

Authors: - Nemanja Hranisavljevic, hranisan@hsu-hh.de, nemanja@ai4cps.com - Tom Westermann, tom.westermann@hsu-hh.de, tom@ai4cps.com

add_time_frames_to_subplots(fig, trace_indices=None, frame_duration_ms=80, transition_duration_ms=0, redraw=False, keep_tail=True, sort_each_trace_by_x=True, frame_stride=1, slider_prefix='Time: ', title_prefix=None)

Add animation frames to a subplot figure where each subplot has one scatter trace and x represents time.

Parameters

fig : go.Figure A subplot figure. trace_indices : sequence[int] | None Which traces to animate. If None, animate all traces in fig.data. frame_duration_ms : int Duration of each frame in milliseconds. transition_duration_ms : int Duration of frame transition in milliseconds. redraw : bool Passed to Plotly animation args. keep_tail : bool If True, each frame shows all points up to time t. If False, each frame shows only the current point for each trace. sort_each_trace_by_x : bool If True, sort each trace by its own x values before animating. frame_stride : int Use every n-th point to reduce number of frames. slider_prefix : str Prefix shown before current slider value. title_prefix : str | None Optional title prefix, e.g. "t = ".

Returns

go.Figure The same figure, updated in place with frames and controls.

export_plotly_frames_animation_cumulative(fig, output_path, fps=5, width=1200, height=800, scale=2, cleanup_frames=True, frame_dir=None)

Export a Plotly animated figure by cumulatively applying go.Frame updates. This better matches Plotly's animation behavior for partial frame updates.

plot2d(df, x=None, y=None, mode='markers', hovercolumns=None, figure=False, **args)

Creates a 2D scatter or line plot using Plotly based on the provided DataFrame columns. Parameters: df (pd.DataFrame): The input DataFrame containing the data to plot. x (str, optional): The column name to use for the x-axis. y (str, optional): The column name to use for the y-axis. mode (str, optional): The Plotly scatter mode (e.g., 'markers', 'lines'). Defaults to 'markers'. hovercolumns (list of str, optional): List of column names to include in the hover tooltip. figure (bool, optional): If True, returns a Plotly Figure object; otherwise, returns a Scatter trace. Defaults to False. **args: Additional keyword arguments passed to the Plotly Scatter constructor. Returns: plotly.graph_objs._scatter.Scatter or plotly.graph_objs._figure.Figure: The generated Plotly Scatter trace or Figure, depending on the 'figure' parameter. Example: plot2d(df, x='feature1', y='feature2', hovercolumns=['label'], mode='markers', figure=True)

plot3d(df, x=None, y=None, z=None, mode='markers', hovercolumns=None, **args)

Creates a 3D scatter plot using Plotly's Scatter3d, with customizable axes, hover information, and additional plot arguments. Parameters: df (pandas.DataFrame): The data source containing columns for x, y, z, and optional hover data. x (str, optional): The column name in df to use for the x-axis. y (str, optional): The column name in df to use for the y-axis. z (str, optional): The column name in df to use for the z-axis. mode (str, optional): Plotly scatter mode (e.g., 'markers', 'lines'). Defaults to 'markers'. hovercolumns (list of str, optional): List of column names in df to include in the hover tooltip. **args: Additional keyword arguments passed to go.Scatter3d. Returns: plotly.graph_objs._scatter3d.Scatter3d: A Plotly 3D scatter plot object configured with the specified data and options.

plot_2d_contour_from_fun(fun, rangex=None, rangey=None, th=50, **kwargs)

Plots a 2D contour of a function over a specified range. Parameters: fun (callable): A function that takes a 2D array of shape (n_points, 2) and returns a 1D array of function values. rangex (tuple, optional): The range for the x-axis as (min, max). Defaults to (-5, 5) if not provided. rangey (tuple, optional): The range for the y-axis as (min, max). Defaults to (-5, 5) if not provided. th (int, optional): Unused parameter, kept for compatibility. Defaults to 50. **kwargs: Additional keyword arguments passed to the plotly.graph_objs.Contour constructor. Returns: plotly.graph_objs.Contour: A Plotly contour plot object representing the function values over the specified range.

plot_cps(cps, dash_id=None, node_labels=False, edge_labels=True, node_size=40, node_font_size=20, edge_font_size=16, edge_text_max_width=None, output='cyto', dash_port=8050, height='100%', minZoom=0.5, maxZoom=2, **kwargs)

Plots all the components of a CPS in the same figure. :param cps: CPS to plot. :param node_labels: Should node labels be plotted. :param edge_labels: Should edge labels be plotted. :param node_size: What is the size of the nodes in the figure. :param node_font_size: The font size of the node labels. :param edge_font_size: The font size of the edge labels. :param edge_text_max_width: Max width of the edge labels. :param output: Should output be plotted as a dash.Cytoscape component ("cyto"), or should dash server be run ("dash"). :param dash_port: If temporary dash server is run, what port to use. :param kwargs: Other paramters are forwarded to the Cytoscape component. :return:

plot_cps_component(cps, id=None, node_labels=False, center_node_labels=False, event_label=True, show_transition_freq=False, show_transition_timing=False, font_size=6, edge_font_size=6, edge_text_max_width=None, init_label=False, limit_interval_precision=None, show_transition_data=False, transition_data_keys=True, node_size=20, output='cyto', dash_port=8050, min_zoom=0.5, split_edges_diff_event=False, max_zoom=1, min_edge_thickness=0.1, max_edge_thickness=4, freq_as_edge_thickness=False, color='black', title_text=None, layout_name='breadthfirst', layout_spacingFactor=1, hide_nodes=None)

Visualizes a component of a Cyber-Physical System (CPS) as a graph using Dash Cytoscape.
This function generates a graphical representation of the discrete states and transitions of a CPS,
with various customization options for node and edge appearance, labels, and output format.
The visualization can be rendered as Dash Cytoscape elements, in a Dash app, or as a notebook widget.
Parameters:
    cps: object
        The CPS object containing discrete states, transitions, and related data.
    id: str, optional
        The unique identifier for the Cytoscape component (default: "graph").
    node_labels: bool, optional
        Whether to display labels on nodes (default: False).
    center_node_labels: bool, optional
        Whether to center node labels (default: False).
    edge_labels: bool, optional
        Whether to display labels on edges (default: True).
    show_transition_freq: bool, optional
        Whether to show transition frequency on edge labels (default: False).
    edge_font_size: int, optional
        Font size for edge labels (default: 6).
    edge_text_max_width: int or None, optional
        Maximum width for edge label text wrapping (default: None).
    init_label: bool, optional
        Whether to label initial state transitions as 'init' (default: False).
    show_transition_data: bool or list, optional
        Whether to display additional transition data on edge labels. If a list, only specified keys are shown (default: False).
    node_size: int, optional
        Size of the nodes (default: 20).
    output: str, optional
        Output format: "cyto" (Dash Cytoscape Div), "elements" (raw elements), "notebook" (inline Dash app), or "dash" (Dash app in browser) (default: "cyto").
    dash_port: int, optional
        Port for running the Dash app (default: 8050).
    min_zoom: float, optional
        Minimum zoom level for the Cytoscape component (default: 0.5).
    max_zoom: float, optional
        Maximum zoom level for the Cytoscape component (default: 1).
    min_edge_thickness: float, optional
        Minimum edge thickness for frequency-based scaling (default: 0.1).
    max_edge_thickness: float, optional
        Maximum edge thickness for frequency-based scaling (default: 4).
    freq_as_edge_thickness: bool, optional
        Whether to scale edge thickness based on transition frequency (default: False).
    color: str, optional
        Color for nodes and edges (default: "black"). If "hsu", uses a preset color.
    title_text: str or Dash component, optional
        Title text or component to display above the graph (default: None).
Returns:
    Dash component, dict, or Dash app:
        - If output == "cyto": returns a Dash html.Div containing the Cytoscape graph.
        - If output == "elements": returns a dict with 'nodes' and 'edges'.
        - If output == "notebook": runs and displays a Dash app inline (for Jupyter).
        - If output == "dash": runs a Dash app in the browser and returns the app instance.
Notes:
    - Requires Dash, dash_cytoscape, dash_bootstrap_components, and pandas.
    - The function supports interactive modals for displaying timing data on states and transitions.
    - Threading is used to launch the Dash app in browser mode without blocking the main program.
# 1. 'grid'            → Places nodes in a simple rectangular grid.

2. 'random' → Randomly positions nodes; useful for testing.

3. 'circle' → Arranges nodes evenly around a circle.

4. 'concentric' → Places nodes in concentric circles, often by degree or weight.

5. 'breadthfirst' → Hierarchical layout (tree-like), good for state machines or DAGs.

Optional params: directed=True, padding=

6. 'cose' → Force-directed layout (spring simulation). Great for organic graphs.

Optional params: idealEdgeLength, nodeRepulsion, gravity, numIter

7. 'cose-bilkent' → Improved force-directed layout with better stability and aesthetics.

(Requires: cyto.load_extra_layouts())

8. 'cola' → Constraint-based force-directed layout; handles larger graphs well.

(Requires: cyto.load_extra_layouts())

9. 'euler' → Physically simulated layout; looks natural and dynamic.

(Requires: cyto.load_extra_layouts())

10. 'avsdf' → Circular layout optimized to reduce edge crossings.

(Requires: cyto.load_extra_layouts())

11. 'spread' → Distributes disconnected components evenly across space.

(Requires: cyto.load_extra_layouts())

12. 'klay' → Layered (hierarchical) layout, excellent for flowcharts or process models.

(Requires: cyto.load_extra_layouts())

13. 'dagre' → Directed acyclic graph layout, ideal for workflows and automata.

(Requires: cyto.load_extra_layouts())

Optional params: rankDir='TB' (top-bottom), 'LR' (left-right), etc.

plot_cps_plotly(cps, layout='kamada_kawai', marker_size=20, node_positions=None, show_events=True, show_num_occur=False, show_state_label=True, font_size=10, plot_self_transitions=True, use_previos_node_positions=False, **kwargs)

Visualizes a Cyber-Physical System (CPS) state-transition graph using Plotly. This function generates an interactive Plotly figure representing the states and transitions of a CPS. Nodes represent system states, and edges represent transitions. Various layout algorithms and display options are supported. Args: cps: The CPS object containing the state-transition graph. Must have attributes _G (networkx graph), get_transitions(), print_state(), num_occur(), and previous_node_positions. layout (str, optional): Layout algorithm for node positioning. Options are "dot" (default), "spectral", "kamada_kawai", or "fruchterman_reingold". marker_size (int, optional): Size of the node markers. Default is 20. node_positions (dict, optional): Precomputed node positions as a dictionary {node: (x, y)}. If None, positions are computed using the selected layout. show_events (bool, optional): Whether to display event labels on transitions. Default is True. show_num_occur (bool, optional): Whether to display the number of occurrences for each transition. Default is False. show_state_label (bool, optional): Whether to display state labels on nodes. Default is True. font_size (int, optional): Font size for transition/event labels. Default is 10. plot_self_transitions (bool, optional): Whether to plot self-loop transitions. Default is True. use_previos_node_positions (bool, optional): If True and node_positions is None, reuse positions from cps.previous_node_positions. Default is False. **kwargs: Additional keyword arguments passed to the layout function (e.g., for networkx layouts). Returns: plotly.graph_objs.Figure: A Plotly figure object representing the CPS state-transition graph. Notes: - Requires Plotly, NetworkX, and pydotplus (for "dot" layout). - The CPS object must provide the required methods and attributes as described above. - Edge and node styling can be further customized by modifying the function.

plot_dash_frames(graph_frames, dash_port=8050)

Launches an interactive Dash web application to visualize a sequence of graph frames with a slider for manual frame selection. Args: graph_frames (list): A list of Dash components (e.g., Cytoscape graphs) representing different frames to display. dash_port (int, optional): The port number on which to run the Dash server. Defaults to 8050. Returns: dash.Dash: The Dash application instance. Side Effects: - Starts a Dash server in a separate thread. - Opens the default web browser to display the Dash app. - Waits for user input before returning. Notes: - The app displays the first frame by default and allows users to select other frames using a slider. - The function blocks until the user presses Enter in the console.

plot_execution_tree(graph, nodes_to_color, color, font_size=30)

Plots a system execution tree as a graph, where the horizontal position of nodes corresponds to their timestamps and the tree branches vertically. Args: graph (networkx.DiGraph): A directed graph where each node represents a system state, and edges represent transitions. Each node should have a 'label' (str) and 'weight' (int) attribute. Node names must be timestamp strings in the format "%d/%m/%Y, %H:%M:%S". nodes_to_color (list): List of node identifiers (timestamp strings) to be highlighted with a specific color. color (str): The color to use for highlighting nodes in nodes_to_color. font_size (int, optional): Font size for node labels in the visualization. Defaults to 30. Returns: cyto.Cytoscape: A Dash Cytoscape object representing the execution tree visualization, with nodes positioned by timestamp and colored as specified. Notes: - The function assumes the first node in graph.nodes is the starting node. - Node positions are determined by the time difference from the start node (x-axis) and their 'weight' attribute (y-axis). - Nodes in nodes_to_color are colored with the specified color; all others are gray. - Requires the cyto (Dash Cytoscape) library and datetime module.

plot_state_transitions(ta, state, obs=None)

Visualizes the outgoing state transitions from a given state in a timed automaton, along with associated observation data. Parameters: ta: An object representing the timed automaton, expected to have an out_transitions(state) method that returns transitions from the given state. state: The current state for which outgoing transitions and associated observations are to be visualized. obs (optional): A pandas DataFrame containing observation data. Must include at least the columns 'Mode', 'q_next', 'Duration', 'Time', and optionally 'Vergussgruppe', 'HID', 'ChipID', 'Order', and 'ArtNr'. If None, the function raises NotImplemented. Returns: fig: A Plotly figure object containing subplots for each outgoing transition. For each transition, the function displays: - A scatter plot of observation durations over time, grouped by 'Vergussgruppe'. - A histogram of durations for each 'Vergussgruppe'. The subplots are arranged with shared axes and appropriate titles for each transition. Raises: NotImplemented: If obs is None. Notes: - The function expects certain columns to exist in the obs DataFrame. If missing, default values are assigned. - Colors for different 'Vergussgruppe' groups are assigned from DEFAULT_PLOTLY_COLORS. - The function uses Plotly's make_subplots, go.Scatter, and go.Histogram for visualization.

plot_stateflow(stateflow, color_mapping=None, state_col='State', task_col='Task', bar_height=12, start_column='Start', finish_column='Finish', return_figure=False, description_col='Description', idle_states=None)

Visualizes state transitions over time for one or more tasks/stations as a Gantt-like interactive timeline.

Parameters: - stateflow (DataFrame or dict): DataFrame with state transitions, or a dictionary of DataFrames per station. - color_mapping (dict, optional): Mapping of state names to colors. If None, default colors are used. - state_col (str): Column name indicating the state (default: 'State'). - bar_height (int): Height of the timeline bars (default: 12). - start_column (str): Column name with start timestamps (default: 'Start'). - finish_column (str): Column name with end timestamps (default: 'Finish'). - return_figure (bool): If True, returns a Plotly Figure. Otherwise, returns a list of Plotly traces. - description_col (str or list): Column(s) to include in the hover tooltip (default: 'Description'). - idle_states (str or list): State(s) to exclude from the plot (e.g., 'IDLE').

Returns: - Plotly Figure or list of traces, depending on return_figure.

Example

fig = plot_stateflow(df, state_col='Mode', start_column='StartTime', finish_column='EndTime', return_figure=True) fig.show()

This function is ideal for visualizing process flows, machine states, or event-based logs with time intervals.

plot_timeseries(data, timestamp=None, mode_data=None, discrete=False, title=None, use_columns=None, height=None, limit_num_points=None, names=None, xaxis_title=None, customdata=None, iterate_colors=True, y_title_font_size=14, opacity=1.0, vertical_spacing=0.005, sharey=False, bounds=None, plot_only_changes=False, yAxisLabelOffset=False, marker_size=4, showlegend=False, mode_height=0.2, x_title=None, **kwargs)

Plot one or more time-series datasets as vertically stacked Plotly subplots.

Each selected column is plotted in its own subplot. If multiple datasets are provided, each subplot contains one trace per dataset for that column. An optional mode_data subplot can be added above the signal subplots.

Parameters

data : pandas.DataFrame | dict | array-like | list[pandas.DataFrame | dict | array-like] One dataset or a list of datasets to plot. Non-DataFrame inputs are converted to pandas DataFrames.

str | int | None, default None

Column name or column index to set as the index for each dataset before plotting. Ignored if None.

pandas.DataFrame | pandas.Series | numpy.ndarray | list[...] | None, default None

Optional mode/categorical signal(s) to plot in a dedicated top subplot. For DataFrame inputs, a Mode column is required. If a Time column exists, it is used for the x-axis; otherwise the index is used.

bool, default False

If True, plot signal traces as markers only. If False, use the Plotly mode specified by mode.

str | None, default None

Figure title.

sequence[str] | None, default None

Columns to plot. If None, all columns from the first dataset are used.

int | None, default None

Figure height in pixels. If None, a default height is computed from the number of subplots.

int | None, default None

Maximum number of points to plot from each trace. If None or negative, all available points are used.

sequence[str] | None, default None

Dataset names used in legend entries. If omitted, traces are named by their dataset index.

str | None, default None

Title applied to all x-axes.

pandas.DataFrame | array-like | None, default None

Extra per-point hover data. This is attached to each signal trace and added to the hover tooltip. Row count should align with plotted points.

bool, default True

If True, cycle through Plotly default colors per dataset. If False, use the first default color for every dataset.

int, default 14

Font size used for subplot y-axis titles.

float, default 1.0

Opacity applied to generated trace colors.

float, default 0.005

Vertical spacing between subplot rows.

bool, default False

If True, share y-axes across subplot rows.

tuple[pandas.DataFrame, pandas.DataFrame] | None, default None

Pair of (upper_bounds, lower_bounds) DataFrames used to draw a filled band for each plotted column.

bool, default False

Only relevant when discrete=True. If True, only points where the signal changes are plotted, along with the first point.

bool, default False

If True, progressively increases y-axis title standoff on lower subplots.

int, default 4

Marker size for discrete and mode traces.

bool, default False

Whether to display the figure legend.

float, default 0.2

Relative height of the mode_data subplot, when present.

str | None, default None

Title applied only to the bottom x-axis.

**kwargs Additional keyword arguments forwarded to go.Scatter.

Returns

plotly.graph_objects.Figure The generated Plotly figure.

Raises

ValueError If input data is missing, mode_height is invalid, or a mode trace is missing its required columns.

KeyError If one of the requested columns is not found in a dataset or bounds data.

Notes
  • The returned figure includes the selected columns only.
  • If mode_data is provided, it occupies the first subplot row.
  • customdata is filtered per trace when plot_only_changes=True.

plot_transition(self, s, d)

Plots the transition histogram between two states. Retrieves the transition data between the source state s and destination state d, and generates a Plotly figure visualizing the timing distribution of the transition. The plot includes a title, an annotation indicating the transition, and a histogram of the transition timings. Args: s: The source state identifier. d: The destination state identifier. Returns: plotly.graph_objs._figure.Figure: A Plotly Figure object containing the histogram of transition timings.

view_graphviz(self, layout='dot', marker_size=20, node_positions=None, show_events=True, show_num_occur=False, show_state_label=True, font_size=10, plot_self_transitions=True, use_previos_node_positions=False, **kwargs)

Visualizes the internal graph structure using Graphviz and returns a pydot graph object. Parameters: layout (str): The layout algorithm to use for node positioning (default: "dot"). marker_size (int): Size of the node markers in the visualization (default: 20). node_positions (dict or None): Optional dictionary mapping node names to (x, y) positions. If None, positions are computed. show_events (bool): Whether to display event labels on transitions (default: True). show_num_occur (bool): Whether to display the number of occurrences for each transition (default: False). show_state_label (bool): Whether to display state labels on nodes (default: True). font_size (int): Font size for labels and annotations (default: 10). plot_self_transitions (bool): Whether to plot self-loop transitions (default: True). use_previos_node_positions (bool): Whether to reuse previously computed node positions (default: False). **kwargs: Additional keyword arguments for customization. Returns: pdp.Dot: A pydot graph object representing the visualized graph. Notes: - Node positions are either computed using Graphviz or taken from the provided/previous positions. - Annotations for transitions can include event names and/or occurrence counts. - The function prepares the graph for further rendering or export, but does not display it directly.

tools

ml4cps.tools

Various methods to transform data.

binary_ordinal_encode(column, order)

Encodes a pandas Series with binary ordinal encoding based on the specified order.

Parameters:

Name Type Description Default
column Series

The column to encode.

required
order list

The ordered list of unique values in the column.

required

Returns:

Type Description

pd.DataFrame: The binary ordinal encoded DataFrame for the given column.

generate_random_walk(start_values, steps=100)

Generates a random walk process for multiple variables.

Parameters: - start_values (list): A list of starting values for each variable. - steps (int): Number of steps in the random walk.

Returns: - pd.DataFrame: DataFrame containing the random walk process for each variable.

remove_timestamps_without_change(data, sig_names=None)

Removes timestamps where no values changed in comparison to the previous timestamp.