Elma
An event loop manager for embedded systems
manager.cc
1 #include <stdexcept>
2 #include <iostream>
3 #include <algorithm>
4 #include <mutex>
5 #include "elma.h"
6 
7 namespace elma {
8 
14  Process& process,
15  high_resolution_clock::duration period) {
16 
17  process._period = period;
18  _processes.push_back(&process);
19  process._manager_ptr = this;
20 
21  if (Priority_min > process._priority || process._priority > Priority_max ){
22  throw Exception("Priority must be between -5(low priority) and 15(high priority)");
23  }
24 
25  return *this;
26 
27  }
28 
32  process.stop();
33  auto i = remove_if(_processes.begin(), _processes.end(), [&](Process * p) {
34  return p == &process;
35  });
36  _processes.erase(i, _processes.end());
37  return *this;
38  }
39 
44  _channels[channel.name()] = &channel;
45  return *this;
46  }
47 
50  Channel& Manager::channel(string name) {
51  if ( _channels.find(name) != _channels.end() ) {
52  return *(_channels[name]);
53  } else {
54  throw Exception("Tried to access an unregistered or non-existant channel.");
55  }
56  }
57 
68  Manager& Manager::watch(std::string event_name, std::function<void(Event&)> handler) {
69  event_handlers[event_name].push_back(handler);
70  return *this;
71  }
72 
85  Manager& Manager::emit(const Event& event) {
86  Event e = event; // make a copy so we can change propagation
87  if ( event_handlers.find(event.name()) != event_handlers.end() ) {
88  for ( auto handler : event_handlers[event.name()] ) {
89  if ( e.propagate() ) {
90  handler(e);
91  }
92  }
93  }
94  return *this;
95  }
96 
100  Manager& Manager::all(std::function< void(Process&) > f) {
101  for(auto process_ptr : _processes) {
102  f(*process_ptr);
103  }
104  return *this;
105  }
106 
110  sort_processes();
111  return all([](Process& p) { p._init();});
112  }
113 
117  _running = true;
118  return all([this](Process& p) { p._start(_elapsed) ;});
119  }
120 
124  _running = false;
125  return all([](Process& p) { p._stop(); });
126  }
127 
131  _update_mutex.lock();
132  _client.process_responses();
133  all([this](Process& p) {
134  if ( _elapsed >= p.last_update() + p.period() ) {
135  p._update(_elapsed);
136  }
137  });
138  _update_mutex.unlock();
139  return *this;
140  }
141 
145 
146  std::sort(_processes.begin(), _processes.end(),[](const Process * lhs, const Process * rhs){
147  return lhs->_priority > rhs->_priority;
148  });
149 
150  return *this;
151  }
152 
157  _simulated_time = true;
158  return *this;
159  };
160 
166  _simulated_time = false;
167  return *this;
168  };
169 
176  Manager& Manager::set_priority(Process& process, int priority) {
177 
178  if (Priority_min <= priority && priority <= Priority_max ){
179  process._priority = priority;
180  sort_processes();
181  } else {
182  throw Exception("Priority must be between -5(low priority) and 15(high priority)");
183  }
184  return *this;
185  }
186 
190  Manager& Manager::run(high_resolution_clock::duration runtime) {
191 
192  return run([&]() { return _elapsed < runtime; });
193 
194  }
195 
199 
200  return run([&]() { return _running; });
201 
202  }
203 
207  Manager& Manager::run(std::function<bool()> condition) {
208 
209  _start_time = high_resolution_clock::now();
210  _elapsed = high_resolution_clock::duration::zero();
211  start();
212 
213  while ( condition() ) {
214  update();
215  std::this_thread::sleep_for(_nice_sleep_amount);
216  update_elapsed_time();
217  }
218 
219  stop();
220 
221  return *this;
222 
223  }
224 
229  void Manager::update_elapsed_time() {
230 
231  // Only run in simulated time if we have the flag set and have processes.
232  if(_simulated_time && !_processes.empty()) {
233 
234  // Find the process that has the smallest time till next update.
235  auto min_iter = std::min_element(_processes.begin(), _processes.end() ,[](Process * lhs, Process * rhs) {
236  auto lhsTime = lhs->last_update() + lhs->period();
237  auto rhsTime = rhs->last_update() + rhs->period();
238  return lhsTime < rhsTime;
239  });
240 
241  Process* nextup = *min_iter;
242 
243  auto newTime = nextup->last_update() + nextup->period();
244 
245  _elapsed = newTime;
246 
247  } else {
248 
249  _elapsed = high_resolution_clock::now() - _start_time;
250 
251  }
252  }
253 
254 }
The Process Manager class.
Definition: manager.h:27
Manager & schedule(Process &process, high_resolution_clock::duration period)
Definition: manager.cc:13
Manager & run()
Definition: manager.cc:198
Channel & channel(string)
Definition: manager.cc:50
Manager & watch(string event_name, std::function< void(Event &)> handler)
Definition: manager.cc:68
Manager & sort_processes()
Definition: manager.cc:144
Events that can be emitted, watched, and responded to with event handlers.
Definition: event.h:23
Manager & add_channel(Channel &)
Definition: manager.cc:43
virtual void stop()=0
Manager & use_simulated_time()
Definition: manager.cc:156
std::string name() const
Definition: event.h:41
Manager & init()
Definition: manager.cc:109
high_resolution_clock::duration last_update()
Definition: process.h:97
high_resolution_clock::duration period()
Definition: process.h:84
Manager & update()
Definition: manager.cc:130
Manager & start()
Definition: manager.cc:116
An exception class for Elma.
Definition: exceptions.h:13
Manager & all(std::function< void(Process &)> f)
Definition: manager.cc:100
bool propagate() const
Definition: event.h:45
Manager & remove(Process &process)
Definition: manager.cc:31
A channel for sending double values to and from Process objects.
Definition: channel.h:21
Manager & emit(const Event &event)
Definition: manager.cc:85
Manager & stop()
Definition: manager.cc:123
Manager & set_priority(Process &process, int priority)
Definition: manager.cc:176
Definition: channel.cc:5
Manager & use_real_time()
Definition: manager.cc:165
An abstract base class for processes.
Definition: process.h:24
string name()
Definition: channel.h:53
Client & process_responses()
Definition: client.cc:39