Skip to content

Commit fa13050

Browse files
authored
Feature: Write taps, support loading a manifest, update to api (#120)
* Feature: Support loading a manifest, update to api * Added some tap examples for writing * Added manifest loading * bump version * removed print
1 parent 8520326 commit fa13050

File tree

13 files changed

+202
-65
lines changed

13 files changed

+202
-65
lines changed

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727
setup(
2828
name="science-synapse",
29-
version="2.2.5",
29+
version="2.2.6",
3030
description="Client library and CLI for the Synapse API",
3131
author="Science Team",
3232
author_email="[email protected]",

synapse/cli/device_info_display.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,32 @@
1-
import time
21
from rich.console import Console
3-
from rich.panel import Panel
4-
from rich.table import Table
52
from rich.tree import Tree
63
from google.protobuf.json_format import MessageToDict
74
from synapse.client.device import Device
85

96

10-
def visualize_configuration(info_dict):
7+
def visualize_configuration(info_dict, status):
8+
nodes_status = status.get("signal_chain", {}).get("nodes", {})
119
config = info_dict.get("configuration", {})
1210
if config:
1311
tree = Tree("Configuration")
14-
for node in config.get("nodes", []):
12+
for index, node in enumerate(config.get("nodes", [])):
1513
node_type = node.get("type", "").replace("k", "")
16-
node_name = node.get("name", "Unknown")
17-
node_tree = tree.add(f"{node_name}")
14+
node_tree = tree.add(f"{node_type}")
1815
node_tree.add(f"ID: {node.get('id', 'Unknown')}")
19-
node_tree.add(f"Type: {node_type}")
20-
2116
if node_type == "Application":
2217
app = node.get("application", {})
2318
name = app.get("name", "Unknown")
24-
running = app.get("running", False)
25-
status = "[green]Running[/green]" if running else "[red]Stopped[/red]"
19+
20+
application_status = nodes_status[index].get("application", None)
21+
running = application_status.get("running", False)
22+
error_logs = application_status.get(
23+
"error_logs", "Could not get error logs"
24+
)
2625
node_tree.add(f"Name: {name}")
27-
node_tree.add(f"Status: {status}")
26+
node_tree.add(f"Running: {running}")
27+
node_tree.add(f"Error Logs:\n{error_logs}")
2828
elif node_type == "BroadbandSource":
2929
source = node.get("broadband_source", {})
30-
name = source.get("name", "Unknown")
31-
running = source.get("running", False)
32-
status = "[green]Running[/green]" if running else "[red]Stopped[/red]"
33-
node_tree.add(f"Name: {name}")
34-
node_tree.add(f"Status: {status}")
3530
if "signal" in source and "electrode" in source["signal"]:
3631
channels = source["signal"]["electrode"].get("channels", [])
3732
electrode_ids = [
@@ -113,4 +108,4 @@ def summary(self, device: Device):
113108
)
114109

115110
self.console.print(visualize_peripherals(info_dict))
116-
self.console.print(visualize_configuration(info_dict))
111+
self.console.print(visualize_configuration(info_dict, status))

synapse/cli/rpc.py

Lines changed: 17 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,18 @@
44
from typing import Optional
55

66
import synapse as syn
7-
from synapse.api.synapse_pb2 import DeviceConfiguration
87
from synapse.api.query_pb2 import QueryRequest, QueryResponse, StreamQueryRequest
98
from synapse.api.status_pb2 import StatusCode
109

1110
from google.protobuf import text_format
1211
from google.protobuf.json_format import Parse
1312

1413
from rich.console import Console
15-
from rich.pretty import pprint
1614

1715
from synapse.cli.query import StreamingQueryClient
1816
from synapse.utils.log import log_entry_to_str
1917
from synapse.cli.device_info_display import DeviceInfoDisplay
18+
from synapse.utils.proto import load_device_config
2019

2120

2221
def add_commands(subparsers):
@@ -182,7 +181,7 @@ def start(args):
182181

183182
console = Console()
184183

185-
config_obj = None # syn.Config if we are provided a *.json* file
184+
config_obj = None
186185
cfg_path = getattr(args, "config_file", None)
187186

188187
if cfg_path:
@@ -196,10 +195,7 @@ def start(args):
196195

197196
# Load the configuration proto and build Config object
198197
try:
199-
with open(cfg_path, "r") as f:
200-
json_text = f.read()
201-
cfg_proto = Parse(json_text, DeviceConfiguration())
202-
config_obj = syn.Config.from_proto(cfg_proto)
198+
config_obj = load_device_config(cfg_path, console)
203199
except Exception as e:
204200
console.print(
205201
f"[bold red]Failed to parse configuration file[/bold red]: {e}"
@@ -263,25 +259,23 @@ def stop(args):
263259

264260

265261
def configure(args):
262+
console = Console()
266263
if Path(args.config_file).suffix != ".json":
267-
print("Configuration file must be a JSON file")
264+
console.print("[bold red]Configuration file must be a JSON file")
268265
return False
269266

270-
with open(args.config_file) as config_json:
271-
console = Console()
272-
config_proto = Parse(config_json.read(), DeviceConfiguration())
273-
console.print("Configuring device with the following configuration:")
274-
config = syn.Config.from_proto(config_proto)
275-
console.print(config.to_proto())
276-
277-
config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config)
278-
if not config_ret:
279-
console.print("[bold red]Internal error configuring device")
280-
return
281-
if config_ret.code != StatusCode.kOk:
282-
console.print(f"[bold red]Error configuring\n{config_ret.message}")
283-
return
284-
console.print("[green]Device configured")
267+
config_obj = load_device_config(args.config_file, console)
268+
console.print("Configuring device with the following configuration:")
269+
console.print(config_obj.to_proto())
270+
271+
config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config_obj)
272+
if not config_ret:
273+
console.print("[bold red]Internal error configuring device")
274+
return
275+
if config_ret.code != StatusCode.kOk:
276+
console.print(f"[bold red]Error configuring\n{config_ret.message}")
277+
return
278+
console.print("[green]Device configured")
285279

286280

287281
def get_logs(args):

synapse/cli/streaming.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from synapse.api.node_pb2 import NodeType
1515
from synapse.api.status_pb2 import DeviceState, StatusCode
16-
from synapse.api.synapse_pb2 import DeviceConfiguration
16+
from synapse.api.device_pb2 import DeviceConfiguration
1717
import synapse as syn
1818
import synapse.client.channel as channel
1919
import synapse.utils.ndtp_types as ndtp_types

synapse/cli/taps.py

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,72 @@
11
from synapse.client.taps import Tap
22

33
from rich.console import Console
4-
from rich.pretty import pprint
54
from rich.table import Table
5+
from rich.live import Live
6+
from rich.text import Text
7+
8+
import time
9+
10+
11+
class TapHealthMonitor:
12+
"""Health monitor for streaming tap data with real-time statistics display."""
13+
14+
def __init__(self, console: Console):
15+
self.console = console
16+
self.message_count = 0
17+
self.total_bytes = 0
18+
self.start_time = None
19+
20+
def start(self):
21+
"""Start the monitoring session."""
22+
self.start_time = time.time()
23+
self.message_count = 0
24+
self.total_bytes = 0
25+
26+
def update(self, message_size: int) -> Text:
27+
"""Update statistics with a new message and return formatted display text."""
28+
current_time = time.time()
29+
self.message_count += 1
30+
self.total_bytes += message_size
31+
32+
# Calculate stats
33+
elapsed_time = current_time - self.start_time
34+
msgs_per_sec = self.message_count / elapsed_time if elapsed_time > 0 else 0
35+
bandwidth_bps = self.total_bytes / elapsed_time if elapsed_time > 0 else 0
36+
37+
# Format bandwidth
38+
bandwidth_str = self._format_bandwidth(bandwidth_bps)
39+
40+
# Create formatted display text
41+
return self._create_display_text(
42+
self.message_count, msgs_per_sec, bandwidth_str, message_size
43+
)
44+
45+
def _format_bandwidth(self, bandwidth_bps: float) -> str:
46+
"""Format bandwidth with appropriate units."""
47+
if bandwidth_bps >= 1024 * 1024:
48+
return f"{bandwidth_bps / (1024 * 1024):.2f} MB/s"
49+
elif bandwidth_bps >= 1024:
50+
return f"{bandwidth_bps / 1024:.2f} KB/s"
51+
else:
52+
return f"{bandwidth_bps:.1f} B/s"
53+
54+
def _create_display_text(
55+
self, msg_count: int, rate: float, bandwidth: str, latest_size: int
56+
) -> Text:
57+
"""Create styled text for the live display."""
58+
stats_text = Text()
59+
stats_text.append("Messages: ", style="bold")
60+
stats_text.append(f"{msg_count:,}", style="cyan")
61+
stats_text.append(" | msgs/sec: ", style="bold")
62+
stats_text.append(f"{rate:.1f}/s", style="green")
63+
stats_text.append(" | Bandwidth: ", style="bold")
64+
stats_text.append(bandwidth, style="yellow")
65+
stats_text.append(" | Latest: ", style="bold")
66+
stats_text.append(f"{latest_size:,} bytes", style="magenta")
67+
stats_text.append(" | Runtime: ", style="bold")
68+
stats_text.append(f"{time.time() - self.start_time:.1f}s", style="blue")
69+
return stats_text
670

771

872
def add_commands(subparsers):
@@ -48,9 +112,26 @@ def stream_taps(args):
48112

49113
console = Console()
50114
console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]")
115+
console.print("[dim]Press Ctrl+C to stop[/]\n")
51116

52-
for message in tap.stream():
53-
message_size = len(str(message))
54-
console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]")
55-
pprint(message, expand_all=False)
56-
console.print("---")
117+
# Initialize health monitor
118+
monitor = TapHealthMonitor(console)
119+
monitor.start()
120+
121+
# Create initial display
122+
initial_text = Text("Waiting for messages...", style="dim")
123+
124+
try:
125+
with Live(initial_text, console=console, refresh_per_second=10) as live:
126+
for message in tap.stream():
127+
message_size = len(message)
128+
129+
# Update statistics and get formatted display
130+
stats_text = monitor.update(message_size)
131+
132+
# Update the live display
133+
live.update(stats_text)
134+
except KeyboardInterrupt:
135+
pass
136+
finally:
137+
tap.disconnect()

synapse/client/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from synapse.api.synapse_pb2 import DeviceConfiguration
1+
from synapse.api.device_pb2 import DeviceConfiguration
22
from synapse.api.node_pb2 import NodeConnection
33
from synapse.client.nodes import NODE_TYPE_OBJECT_MAP
44

synapse/client/nodes/application_node.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
class ApplicationNode(Node):
77
type = NodeType.kApplication
88

9-
def __init__(self, name: str):
9+
def __init__(self, name: str, parameters):
1010
self.name = name
11+
self.parameters = parameters
1112

1213
def _to_proto(self):
1314
n = NodeConfig()
14-
p = ApplicationNodeConfig(name=self.name)
15+
p = ApplicationNodeConfig(name=self.name, parameters=self.parameters)
1516
n.application.CopyFrom(p)
1617
return n
1718

@@ -21,5 +22,4 @@ def _from_proto(proto: ApplicationNodeConfig):
2122
raise ValueError("parameter 'proto' is missing")
2223
if not isinstance(proto, ApplicationNodeConfig):
2324
raise ValueError("proto is not of type ApplicationNodeConfig")
24-
25-
return ApplicationNode(name=proto.name)
25+
return ApplicationNode(name=proto.name, parameters=proto.parameters)

synapse/client/taps.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import logging
2+
import time
23
import zmq
34
from typing import Optional, Generator
45

56
from synapse.api.query_pb2 import QueryRequest
67
from synapse.api.status_pb2 import StatusCode
8+
from synapse.api.tap_pb2 import TapType
79

810

911
class Tap(object):
@@ -75,7 +77,14 @@ def connect(self, name: str) -> bool:
7577

7678
# Initialize ZMQ context and socket
7779
self.zmq_context = zmq.Context()
78-
self.zmq_socket = self.zmq_context.socket(zmq.SUB)
80+
81+
# Create appropriate socket type based on tap type from the selected tap
82+
if selected_tap.tap_type == TapType.TAP_TYPE_CONSUMER:
83+
# For consumer taps, we need to publish data TO the tap
84+
self.zmq_socket = self.zmq_context.socket(zmq.PUB)
85+
else:
86+
# For producer taps (or unspecified), we need to subscribe and listen FROM the tap
87+
self.zmq_socket = self.zmq_context.socket(zmq.SUB)
7988

8089
# Replace the endpoint with our device URI if needed
8190
endpoint = selected_tap.endpoint
@@ -88,9 +97,17 @@ def connect(self, name: str) -> bool:
8897
endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}"
8998

9099
try:
91-
print(f"Connecting to tap '{name}' at {endpoint}")
92100
self.zmq_socket.connect(endpoint)
93-
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages
101+
102+
# Give the socket a chance to connect
103+
self.logger.info("Waiting for socket to connect...")
104+
time.sleep(1)
105+
106+
# Only set subscription options for subscriber sockets
107+
if selected_tap.tap_type != TapType.TAP_TYPE_CONSUMER:
108+
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
109+
print("Subscribed to all messages")
110+
94111
return True
95112
except zmq.ZMQError as e:
96113
self.logger.error(f"Failed to connect to tap: {e}")
@@ -123,6 +140,33 @@ def read(self, timeout_ms: int = 1000) -> Optional[bytes]:
123140
self.logger.error(f"Error receiving message: {e}")
124141
return None
125142

143+
def send(self, data: bytes) -> bool:
144+
"""Send raw data to the tap (only works for consumer taps with PUB socket).
145+
146+
Args:
147+
data (bytes): Raw message data to send.
148+
149+
Returns:
150+
bool: True if sent successfully, False otherwise.
151+
"""
152+
if not self.zmq_socket:
153+
self.logger.error("Not connected to any tap")
154+
return False
155+
156+
if (
157+
not self.connected_tap
158+
or self.connected_tap.tap_type != TapType.TAP_TYPE_CONSUMER
159+
):
160+
self.logger.error("Send is only available for consumer taps")
161+
return False
162+
163+
try:
164+
self.zmq_socket.send(data)
165+
return True
166+
except zmq.ZMQError as e:
167+
self.logger.error(f"Error sending message: {e}")
168+
return False
169+
126170
def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]:
127171
"""Stream raw data from the tap.
128172

synapse/server/rpc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from synapse.api.logging_pb2 import LogLevel, LogQueryResponse
1111
from synapse.api.query_pb2 import QueryResponse
1212
from synapse.api.status_pb2 import DeviceState, Status, StatusCode
13-
from synapse.api.synapse_pb2 import DeviceConfiguration, DeviceInfo
13+
from synapse.api.device_pb2 import DeviceConfiguration, DeviceInfo
1414
from synapse.api.synapse_pb2_grpc import (
1515
SynapseDeviceServicer,
1616
add_SynapseDeviceServicer_to_server,

0 commit comments

Comments
 (0)