-
Notifications
You must be signed in to change notification settings - Fork 21
Expand file tree
/
Copy pathMyWorker.hpp
More file actions
194 lines (162 loc) · 6.76 KB
/
MyWorker.hpp
File metadata and controls
194 lines (162 loc) · 6.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
/******************************************************************************
*
* Copyright (c) Phoenix Contact GmbH & Co. KG. All rights reserved.
* Licensed under the MIT. See LICENSE file in the project root for full license information.
*
******************************************************************************/
#pragma once
#include "Arp/System/Commons/Threading/Thread.hpp"
#include "Arp/System/Commons/Exceptions/Exceptions.h"
#include "Arp/System/Commons/Chrono/SystemTick.hpp"
#include "Arp/System/Commons/Threading/Mutex.hpp"
#include "Arp/Base/Commons/Logging/Log.hpp"
#include <queue>
#include <utility>
namespace BufferedExchange
{
using namespace std::chrono;
using namespace Arp::System::Commons::Threading;
using namespace Arp::Base::Commons::Logging;
template<typename S>
class MyWorker
{
#define MAX_ELEMENTS 1000
#define MIN_QUEUE_SIZE_TO_PROCESS 10
public: // Properties
int RunCount;
bool Stop;
Mutex d1;
public: // Methods the Threads call.
// single Execution e.g. for WorkerThreads.
void RunSingle(void)
{
++this->RunCount;
process();
};
//"Endless loop" for normal thread
void Run(void *t)
{
while (!this->Stop)
{
RunSingle();
// Sleep for X ms until next RunSingle execution.
Thread::Sleep(999);
}
};
//Static thread implementation
static void RunStatic(void *pParam)
{
((MyWorker *)pParam)->Run(nullptr);
};
public: // Construction
MyWorker() : RunCount(0),
Stop(true)
{
Log::Initialize("MyWorker");
Log::Info("------------------- MyWorker Constructed: {0}",Stop);
}
~MyWorker()
{
Log::Info("------------------- MyWorker Deconstructed: {0}",Stop);
Stop = true;
}
private:
// use thread safe data storage of your choice or implement with semaphores|mutex etc.
// active queue for processing
std::queue<std::pair<double, S>> data_toprocess;
// active queue for storing
std::queue<std::pair<double, S>> data_storage;
// Timepoint of last SetData command to log time between entries.
std::chrono::nanoseconds last_time{};
private:
void process()
{
try
{
// print first Element of the queue
if(!data_toprocess.empty()){
Log::Info("---------------- {2} firstElement: time:{0} data:{1} ",
data_toprocess.front().first,
data_toprocess.front().second,
Thread::GetCurrentThread()->GetName());
}
// add another Mutex here if you need to ensure only one thread is processing the data.
while (!data_toprocess.empty() && !Stop)
{
// Do something with the first element of this queue
// in this case we are printing the Last Elemnt of the the queue.
if (data_toprocess.size() == 1)
{
Log::Info("----------------{2} lastElement time:{0} data:{1}",
data_toprocess.front().first,
data_toprocess.front().second,
Thread::GetCurrentThread()->GetName());
}
// remove first element.
data_toprocess.pop();
}
//check data_storage size to wait for right amount to be processed.
//you might want to collect data (e.g. before sending them via TCP to)
//And while you are not processing you do not need to swap.
while (data_storage.size() < MIN_QUEUE_SIZE_TO_PROCESS && !Stop)
{
Thread::Sleep(10);
}
// Get a lock on the Store to swap it.
// this operation is blocking but only blocks the thread.
d1.Lock();
Log::Info("----------------{0} Swap queue pointers data_storage<<-->>data_toprocess", Thread::GetCurrentThread()->GetName());
std::swap(data_storage, data_toprocess);
d1.Unlock();
}
catch (...)
{
Stop = true;
}
};
public: // Data Interface
// 1 = Okay || 0 = Error
bool SetData(S x)
{
bool result{true};
// make sure only one process can push to the queue at once.
d1.Lock();
try
{
if(Stop) {
throw InvalidOperationException("Data processing is stopped please do not push");
}
if(data_storage.size() > MAX_ELEMENTS) {
throw OutOfMemoryException("DataStore has to many elements already!");
}
// Lock to ensure the data_storage does not swap during push.
auto time = Arp::System::Commons::Chrono::SystemTick::GetNanoTick();
data_storage.push(std::pair<double, S>( duration_cast<duration<double>>(time - last_time).count(), x));
last_time = time;
}
// error handling for pushing // full buffer etc.
catch (InvalidOperationException &e){
Log::Error("--- Exception in ThreadProcess:{0}",Thread::GetCurrentThread()->GetName());
Log::Error("--- {0}",e.GetMessage());
// Do something to fix this or rethrow to handle elsewhere?
result = false;
}
catch (OutOfMemoryException &e){
Log::Error("--- Exception in ThreadProcess:{0}",Thread::GetCurrentThread()->GetName());
Log::Error("--- {0}",e.GetMessage());
Log::Error("--- Process Queue:{0}, Store Queue:{1} ", data_toprocess.size(),data_storage.size());
// Do something to fix this or rethrow to handle elsewhere?
result = false;
}
catch (Exception &e)
{
Log::Error("--- Exception in ThreadProcess:{0}",Thread::GetCurrentThread()->GetName());
Log::Error("--- {0}",e.GetMessage());
Stop = true;
result = false;
}
d1.Unlock();
return result;
};
};
} // namespace BufferedExchange