-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathdevice.py
More file actions
205 lines (177 loc) · 6.66 KB
/
device.py
File metadata and controls
205 lines (177 loc) · 6.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
from __future__ import annotations
from typing import AsyncGenerator, Optional, Union
import grpc
from google.protobuf.empty_pb2 import Empty
import logging
from datetime import datetime
from synapse.api.logging_pb2 import (
LogQueryResponse,
LogQueryRequest,
LogLevel,
TailLogsRequest,
)
from synapse.api.query_pb2 import StreamQueryRequest, StreamQueryResponse
from synapse.api.status_pb2 import StatusCode, Status
from synapse.api.synapse_pb2_grpc import SynapseDeviceStub
from synapse.api.device_pb2 import (
UpdateDeviceSettingsRequest,
UpdateDeviceSettingsResponse,
)
from synapse.client.config import Config
from synapse.utils.log import log_level_to_pb
DEFAULT_SYNAPSE_PORT = 647
class Device(object):
def __init__(self, uri, verbose=False):
if not uri:
raise ValueError("URI cannot be empty or none")
if len(uri.split(":")) != 2:
self.uri = uri + f":{DEFAULT_SYNAPSE_PORT}"
else:
self.uri = uri
self.channel = grpc.insecure_channel(self.uri)
self.rpc = SynapseDeviceStub(self.channel)
self.logger = logging.getLogger(__name__)
level = logging.DEBUG if verbose else logging.ERROR
self.logger.setLevel(level)
def start(self):
try:
response = self.rpc.Start(Empty())
if self._handle_status_response(response):
return response
except grpc.RpcError as e:
self.logger.debug("Error: %s", e.details())
return False
def start_with_status(self) -> Status:
try:
response = self.rpc.Start(Empty())
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def stop(self):
try:
response = self.rpc.Stop(Empty())
if self._handle_status_response(response):
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return False
def stop_with_status(self) -> Status:
try:
return self.rpc.Stop(Empty())
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def info(self):
try:
response = self.rpc.Info(Empty())
self._handle_status_response(response.status)
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def query(self, query):
try:
response = self.rpc.Query(query)
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def configure(self, config: Config):
assert isinstance(config, Config), "config must be an instance of Config"
config.set_device(self)
try:
response = self.rpc.Configure(config.to_proto())
if self._handle_status_response(response):
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return False
def configure_with_status(self, config: Config) -> Status:
assert isinstance(config, Config), "config must be an instance of Config"
config.set_device(self)
try:
response = self.rpc.Configure(config.to_proto())
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def get_name(self) -> Optional[str]:
info = self.info()
return info.name if info else None
def get_logs(
self,
log_level: Union[str, LogLevel] = "INFO",
since_ms: Optional[int] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> Optional[LogQueryResponse]:
try:
request = LogQueryRequest()
request.min_level = log_level_to_pb(log_level)
if since_ms is not None:
request.since_ms = since_ms
else:
if start_time:
request.start_time_ns = int(start_time.timestamp() * 1e9)
if end_time:
request.end_time_ns = int(end_time.timestamp() * 1e9)
response = self.rpc.GetLogs(request)
return response
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def get_logs_with_status(
self,
log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO,
since_ms: Optional[int] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> Optional[Status]:
try:
request = LogQueryRequest()
request.min_level = log_level_to_pb(log_level)
if since_ms is not None:
request.since_ms = since_ms
else:
if start_time:
request.start_time_ns = int(start_time.timestamp() * 1e9)
if end_time:
request.end_time_ns = int(end_time.timestamp() * 1e9)
return self.rpc.GetLogs(request)
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def tail_logs(
self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO
) -> AsyncGenerator[LogQueryResponse, None]:
try:
request = TailLogsRequest()
request.min_level = log_level_to_pb(log_level)
return self.rpc.TailLogs(request)
except grpc.RpcError as e:
self.logger.error("Error: %s", e.details())
return None
def stream_query(
self, stream_request: StreamQueryRequest
) -> AsyncGenerator[StreamQueryResponse, None]:
try:
for response in self.rpc.StreamQuery(stream_request):
yield response
except Exception as e:
self.logger.error(f"Error during StreamQuery: {str(e)}")
yield StreamQueryResponse(code=StatusCode.kQueryFailed)
def update_device_settings(
self, request: UpdateDeviceSettingsRequest
) -> Optional[UpdateDeviceSettingsResponse]:
try:
return self.rpc.UpdateDeviceSettings(request)
except Exception as e:
self.logger.error(f"Error during update settings: {str(e)}")
return None
def _handle_status_response(self, status):
if status.code != StatusCode.kOk:
self.logger.error("Error %d: %s", status.code, status.message)
return False
else:
return True