Skip to content

Commit 23de5e3

Browse files
Splinter1984eboasson
authored andcommitted
ddspy: initial qos_provider support impl.
Implementation of `QosProvider` class. Support for `qos_provider` functionality in python API + basic test implemntation.
1 parent 5f892ba commit 23de5e3

File tree

4 files changed

+201
-1
lines changed

4 files changed

+201
-1
lines changed

cyclonedds/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
1111
"""
1212

13-
from . import internal, util, qos, core, domain, topic, pub, sub, builtin, dynamic, idl
13+
from . import internal, util, qos, core, domain, topic, pub, sub, builtin, dynamic, idl, qos_provider
1414

1515
__all__ = [
1616
"internal",
@@ -24,4 +24,5 @@
2424
"builtin",
2525
"dynamic",
2626
"idl",
27+
"qos_provider",
2728
]

cyclonedds/internal.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,13 +285,23 @@ class stat_kind(IntEnum):
285285
DDS_STAT_KIND_LENGTHTIME = 2
286286

287287

288+
class qos_kind(IntEnum):
289+
DDS_PARTICIPANT_QOS = 0
290+
DDS_PUBLISHER_QOS = 1
291+
DDS_SUBSCRIBER_QOS = 2
292+
DDS_TOPIC_QOS = 3
293+
DDS_READER_QOS = 4
294+
DDS_WRITER_QOS = 5
295+
296+
288297
class stat_value(ct.Union):
289298
_fields_ = [
290299
('u32', ct.c_uint32),
291300
('u64', ct.c_uint64),
292301
('lengthtime', ct.c_uint64)
293302
]
294303

304+
295305
class stat_keyvalue(ct.Structure):
296306
_fields_ = [
297307
('name', ct.c_char_p),
@@ -319,6 +329,8 @@ class dds_c_t: # noqa N801
319329
destination_order = ct.c_int
320330
data_representation_id = ct.c_int16
321331
qos_p = ct.c_void_p
332+
qos_provider_p = ct.c_void_p
333+
qos_kind = ct.c_int32
322334
attach = ct.c_void_p
323335
listener_p = ct.c_void_p
324336
topic_descriptor_p = ct.c_void_p

cyclonedds/qos_provider.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
* Copyright(c) 2025 ZettaScale Technology and others
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Eclipse Public License v. 2.0 which is available at
6+
* http://www.eclipse.org/legal/epl-2.0, or the Eclipse Distribution License
7+
* v. 1.0 which is available at
8+
* http://www.eclipse.org/org/documents/edl-v10.php.
9+
*
10+
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
11+
"""
12+
13+
import ctypes as ct
14+
from typing import Any, Optional
15+
16+
from .core import DDSException
17+
from .internal import DDS, c_call, dds_c_t, qos_kind
18+
from .qos import (
19+
DataReaderQos, DataWriterQos, DomainParticipantQos, PublisherQos, Qos,
20+
SubscriberQos, TopicQos, _CQos,
21+
)
22+
23+
24+
class QosProvider(DDS):
25+
"""
26+
Representing QosProvider
27+
"""
28+
29+
_qos_provider: dds_c_t.qos_provider_p
30+
31+
def __init__(
32+
self,
33+
path: Any,
34+
scope: Optional[str] = ""):
35+
bpath = path
36+
if not isinstance(path, str) and not isinstance(path, bytes):
37+
bpath = path.__str__().encode('utf-8')
38+
elif isinstance(path, str):
39+
bpath = path.encode('utf-8')
40+
bscope = scope.encode('utf-8')
41+
self._qos_provider = dds_c_t.qos_provider_p(0)
42+
ret = self._create_qos_provider_scope(
43+
bpath, ct.byref(self._qos_provider), bscope
44+
)
45+
if ret < 0 or not self._qos_provider:
46+
raise DDSException(ret, f"Occured when initialize {repr(self)}")
47+
48+
def _get_qos_kind(self, key: str, kind: qos_kind) -> Qos:
49+
bkey = key.encode('utf-8')
50+
cqos = dds_c_t.qos_p(0)
51+
ret = self._qos_provider_get_qos(
52+
self._qos_provider, kind, bkey, ct.byref(cqos)
53+
)
54+
if ret < 0:
55+
raise DDSException(ret, f"Occured trying to get_topic_qos {repr(self)}")
56+
return _CQos.cqos_to_qos(cqos)
57+
58+
def get_datawriter_qos(self, key: str = "") -> 'DataWriterQos':
59+
return self._get_qos_kind(key, qos_kind.DDS_WRITER_QOS).datawriter()
60+
61+
def get_datareader_qos(self, key: str = "") -> 'DataReaderQos':
62+
return self._get_qos_kind(key, qos_kind.DDS_READER_QOS).datareader()
63+
64+
def get_domain_participant_qos(self, key: str = "") -> 'DomainParticipantQos':
65+
return self._get_qos_kind(key, qos_kind.DDS_PARTICIPANT_QOS).domain_participant()
66+
67+
def get_publisher_qos(self, key: str = "") -> 'PublisherQos':
68+
return self._get_qos_kind(key, qos_kind.DDS_PUBLISHER_QOS).publisher()
69+
70+
def get_subscriber_qos(self, key: str = "") -> 'SubscriberQos':
71+
return self._get_qos_kind(key, qos_kind.DDS_SUBSCRIBER_QOS).subscriber()
72+
73+
def get_topic_qos(self, key: str = "") -> 'TopicQos':
74+
return self._get_qos_kind(key, qos_kind.DDS_TOPIC_QOS).topic()
75+
76+
def __del__(self) -> None:
77+
self._delete_qos_provider(self._qos_provider)
78+
79+
@c_call("dds_create_qos_provider_scope")
80+
def _create_qos_provider_scope(
81+
self, path: ct.c_char_p, provider: ct.POINTER(dds_c_t.qos_provider_p),
82+
scope: ct.c_char_p
83+
) -> dds_c_t.returnv:
84+
pass
85+
86+
@c_call("dds_delete_qos_provider")
87+
def _delete_qos_provider(
88+
self, provider: dds_c_t.qos_provider_p
89+
) -> None:
90+
pass
91+
92+
@c_call("dds_qos_provider_get_qos")
93+
def _qos_provider_get_qos(
94+
self, provider: dds_c_t.qos_provider_p, qos_type: dds_c_t.qos_kind,
95+
key: ct.c_char_p, qos: ct.POINTER(dds_c_t.qos_p)
96+
) -> dds_c_t.returnv:
97+
pass

tests/test_qos_provider.py

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import pytest
2+
import os
3+
from pathlib import Path
4+
from cyclonedds.core import DDSException
5+
from cyclonedds.qos_provider import QosProvider
6+
from cyclonedds.qos import (
7+
DataReaderQos, DataWriterQos, DomainParticipantQos, PublisherQos, Qos,
8+
SubscriberQos, TopicQos,
9+
)
10+
11+
12+
@pytest.fixture(scope="function")
13+
def sysdef_file(tmp_path):
14+
q_str = """<dds>
15+
<qos_library name="qoslib01">
16+
<qos_profile name="qosprofile01">
17+
<datareader_qos>
18+
<history><kind>KEEP_LAST_HISTORY_QOS</kind><depth>5</depth></history>
19+
</datareader_qos>
20+
<datawriter_qos>
21+
<history><kind>KEEP_LAST_HISTORY_QOS</kind><depth>5</depth></history>
22+
</datawriter_qos>
23+
<topic_qos>
24+
<resource_limits><max_instances>5</max_instances></resource_limits>
25+
</topic_qos>
26+
<publisher_qos>
27+
<partition><name><element>part02</element></name></partition>
28+
</publisher_qos>
29+
<subscriber_qos>
30+
<partition><name><element>part01</element></name></partition>
31+
</subscriber_qos>
32+
<domain_participant_qos>
33+
<user_data><value>ZCBzdHJpbmcgZXhhbXBsZQ==</value></user_data>
34+
</domain_participant_qos>
35+
</qos_profile>
36+
</qos_library>
37+
</dds>"""
38+
tmp_file = (tmp_path / "sysdef_common_test.xml")
39+
assert tmp_file.write_text(q_str) != 0
40+
return tmp_file
41+
42+
43+
def test_qos_provider_create(sysdef_file):
44+
qp0 = QosProvider(sysdef_file, "*::qosprofile01")
45+
wr_q = qp0.get_datawriter_qos("qoslib01::qosprofile01")
46+
assert isinstance(wr_q, DataWriterQos)
47+
qp1 = QosProvider("<dds></dds>")
48+
ret = True
49+
try:
50+
wr_q = qp1.get_datawriter_qos("qoslib01::qosprofile01")
51+
except DDSException:
52+
assert ret # nothing to get from empty definition
53+
ret = False
54+
assert not ret
55+
56+
57+
def test_qos_provider_get_topic(sysdef_file):
58+
qp = QosProvider(sysdef_file, "*::*")
59+
qs = Qos.fromdict({"ResourceLimits": {"max_instances": 5}}).topic()
60+
assert qp.get_topic_qos("qoslib01::qosprofile01") == qs
61+
62+
63+
def test_qos_provider_get_subscriber(sysdef_file):
64+
qp = QosProvider(str(sysdef_file), "::::")
65+
qs = Qos.fromdict({"Partition": {"partitions": ["part01"]}}).subscriber()
66+
assert qp.get_subscriber_qos("qoslib01::qosprofile01") == qs
67+
68+
69+
def test_qos_provider_get_publisher(sysdef_file):
70+
qp = QosProvider(str(sysdef_file), "::")
71+
qs = Qos.fromdict({"Partition": {"partitions": ["part02"]}}).publisher()
72+
assert qp.get_publisher_qos("qoslib01::qosprofile01") == qs
73+
74+
75+
def test_qos_provider_get_datareader(sysdef_file):
76+
qp = QosProvider(str(sysdef_file).encode('utf-8'), "")
77+
qs = Qos.fromdict({"History": {"kind": "KeepLast", "depth": 5}}).datareader()
78+
assert qp.get_datareader_qos("qoslib01::qosprofile01") == qs
79+
80+
81+
def test_qos_provider_get_datawriter(sysdef_file):
82+
qp = QosProvider(sysdef_file)
83+
qs = Qos.fromdict({"History": {"kind": "KeepLast", "depth": 5}}).datawriter()
84+
assert qp.get_datawriter_qos("qoslib01::qosprofile01") == qs
85+
86+
87+
def test_qos_provider_get_participant(sysdef_file):
88+
qp = QosProvider(sysdef_file)
89+
qs = Qos.fromdict({"Userdata": {"data": "ZCBzdHJpbmcgZXhhbXBsZQ=="}}).domain_participant()
90+
assert qp.get_domain_participant_qos("qoslib01::qosprofile01") == qs

0 commit comments

Comments
 (0)