Skip to content

Commit 8a4b7f6

Browse files
authored
Fix stream handling (#109)
* Fix stream handling * Update * Uncrustified Co-authored-by: GitHub Actions Bot <>
1 parent 68adec1 commit 8a4b7f6

File tree

14 files changed

+72
-64
lines changed

14 files changed

+72
-64
lines changed

rmw_microxrcedds_c/src/callbacks.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ void on_request(
106106
while (service_item != NULL)
107107
{
108108
rmw_uxrce_service_t* custom_service = (rmw_uxrce_service_t*)service_item->data;
109-
if (custom_service->request_id == request_id)
109+
if (custom_service->service_data_resquest == request_id)
110110
{
111111
custom_service->micro_buffer_lenght[custom_service->history_write_index] = length;
112112
ucdr_deserialize_array_uint8_t(
@@ -151,7 +151,7 @@ void on_reply(
151151
while (client_item != NULL)
152152
{
153153
rmw_uxrce_client_t* custom_client = (rmw_uxrce_client_t*)client_item->data;
154-
if (custom_client->request_id == request_id)
154+
if (custom_client->client_data_request == request_id)
155155
{
156156
custom_client->micro_buffer_lenght[custom_client->history_write_index] = length;
157157
ucdr_deserialize_array_uint8_t(

rmw_microxrcedds_c/src/rmw_client.c

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ rmw_create_client(
3535
const rmw_qos_profile_t* qos_policies)
3636
{
3737
EPROS_PRINT_TRACE()
38-
rmw_client_t * rmw_client = NULL;
38+
rmw_client_t* rmw_client = NULL;
3939
if (!node)
4040
{
4141
RMW_SET_ERROR_MSG("node handle is null");
@@ -145,12 +145,13 @@ rmw_create_client(
145145
}
146146
client_req = uxr_buffer_create_requester_xml(
147147
&custom_node->context->session,
148-
custom_node->context->reliable_output, custom_client->client_id,
148+
*custom_node->context->creation_destroy_stream,
149+
custom_client->client_id,
149150
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
150151
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
151152
// TODO(pablogs9): Is possible to instantiate a replier by ref?
152153
// client_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
153-
// custom_node->context->reliable_output, custom_service->subscriber_id,
154+
// *custom_node->context->creation_destroy_stream, custom_service->subscriber_id,
154155
// custom_node->participant_id, "", UXR_REPLACE);
155156
#endif
156157

@@ -169,14 +170,19 @@ rmw_create_client(
169170
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;
170171

171172
custom_client->stream_id =
173+
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
174+
custom_node->context->best_effort_output :
175+
custom_node->context->reliable_output;
176+
177+
uxrStreamId data_request_stream_id =
172178
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
173179
custom_node->context->best_effort_input :
174180
custom_node->context->reliable_input;
175181

176-
custom_client->request_id = uxr_buffer_request_data(
182+
custom_client->client_data_request = uxr_buffer_request_data(
177183
&custom_node->context->session,
178-
custom_node->context->reliable_output, custom_client->client_id,
179-
custom_client->stream_id, &delivery_control);
184+
*custom_node->context->creation_destroy_stream, custom_client->client_id,
185+
data_request_stream_id, &delivery_control);
180186
}
181187
return(rmw_client);
182188

rmw_microxrcedds_c/src/rmw_graph.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ rmw_ret_t rmw_graph_init(
5353

5454
uint16_t participant_req = uxr_buffer_create_participant_xml(
5555
&context->session,
56-
context->reliable_output,
56+
*context->creation_destroy_stream,
5757
graph_info->participant_id, (int16_t)microros_domain_id,
5858
rmw_uxrce_xml_buffer, UXR_REPLACE);
5959

@@ -84,7 +84,7 @@ rmw_ret_t rmw_graph_init(
8484
}
8585

8686
uint16_t subscriber_req = uxr_buffer_create_subscriber_xml(
87-
&context->session, context->reliable_output, graph_info->subscriber_id,
87+
&context->session, *context->creation_destroy_stream, graph_info->subscriber_id,
8888
graph_info->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
8989

9090
graph_info->datareader_id = uxr_object_id(context->id_datareader++, UXR_DATAREADER_ID);
@@ -105,7 +105,7 @@ rmw_ret_t rmw_graph_init(
105105
}
106106

107107
uint16_t topic_req = uxr_buffer_create_topic_xml(
108-
&context->session, context->reliable_output, graph_info->topic_id,
108+
&context->session, *context->creation_destroy_stream, graph_info->topic_id,
109109
graph_info->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
110110

111111
// Create graph datareader request
@@ -120,7 +120,7 @@ rmw_ret_t rmw_graph_init(
120120
}
121121

122122
uint16_t datareader_req = uxr_buffer_create_datareader_xml(
123-
&context->session, context->reliable_output, graph_info->datareader_id,
123+
&context->session, *context->creation_destroy_stream, graph_info->datareader_id,
124124
graph_info->subscriber_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
125125

126126
// Run session
@@ -142,7 +142,7 @@ rmw_ret_t rmw_graph_init(
142142
delivery_control.max_elapsed_time = UXR_MAX_ELAPSED_TIME_UNLIMITED;
143143
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;
144144

145-
graph_info->subscription_request = uxr_buffer_request_data(
145+
uxr_buffer_request_data(
146146
&context->session,
147147
context->reliable_output, graph_info->datareader_id,
148148
context->reliable_input, &delivery_control);

rmw_microxrcedds_c/src/rmw_guard_condition.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ rmw_create_guard_condition(rmw_context_t* context)
2323
(void)context;
2424
EPROS_PRINT_TRACE()
2525

26-
rmw_guard_condition_t * rmw_guard_condition = (rmw_guard_condition_t*)rmw_allocate(
26+
rmw_guard_condition_t* rmw_guard_condition = (rmw_guard_condition_t*)rmw_allocate(
2727
sizeof(rmw_guard_condition_t));
2828

2929
rmw_guard_condition->context = context;

rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ create_topic(
6565

6666
topic_req = uxr_buffer_create_topic_xml(
6767
&custom_node->context->session,
68-
custom_node->context->reliable_output, custom_topic->topic_id,
68+
*custom_node->context->creation_destroy_stream, custom_topic->topic_id,
6969
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
7070
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
7171
(void)qos_policies;
@@ -79,7 +79,7 @@ create_topic(
7979

8080
topic_req = uxr_buffer_create_topic_ref(
8181
&custom_node->context->session,
82-
custom_node->context->reliable_output, custom_topic->topic_id,
82+
*custom_node->context->creation_destroy_stream, custom_topic->topic_id,
8383
custom_node->participant_id, rmw_uxrce_profile_name, UXR_REPLACE);
8484
#endif
8585

rmw_microxrcedds_c/src/rmw_node.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ rmw_node_t* create_node(
9191
participant_req =
9292
uxr_buffer_create_participant_xml(
9393
&node_info->context->session,
94-
node_info->context->reliable_output,
94+
*node_info->context->creation_destroy_stream,
9595
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
9696
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
9797
if (!build_participant_profile(rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name)))
@@ -102,7 +102,7 @@ rmw_node_t* create_node(
102102
participant_req =
103103
uxr_buffer_create_participant_ref(
104104
&node_info->context->session,
105-
node_info->context->reliable_output,
105+
*node_info->context->creation_destroy_stream,
106106
node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_profile_name, UXR_REPLACE);
107107
#endif
108108

@@ -134,7 +134,7 @@ rmw_create_node(
134134
(void)context;
135135
(void)localhost_only;
136136
EPROS_PRINT_TRACE()
137-
rmw_node_t * rmw_node = NULL;
137+
rmw_node_t* rmw_node = NULL;
138138
if (!name || strlen(name) == 0)
139139
{
140140
RMW_SET_ERROR_MSG("name is null");

rmw_microxrcedds_c/src/rmw_publisher.c

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ rmw_create_publisher(
6767
(void)publisher_options;
6868

6969
EPROS_PRINT_TRACE()
70-
rmw_publisher_t * rmw_publisher = NULL;
70+
rmw_publisher_t* rmw_publisher = NULL;
7171
if (!node)
7272
{
7373
RMW_SET_ERROR_MSG("node handle is null");
@@ -119,8 +119,8 @@ rmw_create_publisher(
119119

120120
custom_publisher->stream_id =
121121
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
122-
custom_node->context->best_effort_input :
123-
custom_node->context->reliable_input;
122+
custom_node->context->best_effort_output :
123+
custom_node->context->reliable_output;
124124

125125
custom_publisher->cs_cb_size = NULL;
126126
custom_publisher->cs_cb_serialization = NULL;
@@ -185,13 +185,13 @@ rmw_create_publisher(
185185
}
186186
publisher_req = uxr_buffer_create_publisher_xml(
187187
&custom_publisher->owner_node->context->session,
188-
custom_node->context->reliable_output,
188+
*custom_node->context->creation_destroy_stream,
189189
custom_publisher->publisher_id,
190190
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
191191
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
192192
publisher_req = uxr_buffer_create_publisher_xml(
193193
&custom_publisher->owner_node->context->session,
194-
custom_node->context->reliable_output,
194+
*custom_node->context->creation_destroy_stream,
195195
custom_publisher->publisher_id,
196196
custom_node->participant_id, "", UXR_REPLACE);
197197
#endif
@@ -220,7 +220,7 @@ rmw_create_publisher(
220220

221221
datawriter_req = uxr_buffer_create_datawriter_xml(
222222
&custom_publisher->owner_node->context->session,
223-
custom_node->context->reliable_output,
223+
*custom_node->context->creation_destroy_stream,
224224
custom_publisher->datawriter_id,
225225
custom_publisher->publisher_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
226226
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
@@ -232,7 +232,7 @@ rmw_create_publisher(
232232

233233
datawriter_req = uxr_buffer_create_datawriter_ref(
234234
&custom_publisher->owner_node->context->session,
235-
custom_node->context->reliable_output,
235+
*custom_node->context->creation_destroy_stream,
236236
custom_publisher->datawriter_id,
237237
custom_publisher->publisher_id, rmw_uxrce_profile_name, UXR_REPLACE);
238238
#endif

rmw_microxrcedds_c/src/rmw_request.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ rmw_send_request(
4040
(const message_type_support_callbacks_t*)req_members->data;
4141

4242
ucdrBuffer mb;
43-
uint32_t request_length = functions->get_serialized_size(ros_request);
43+
uint32_t request_length = functions->get_serialized_size(ros_request);
4444
*sequence_id = uxr_prepare_output_stream(
45-
&custom_node->context->session,
46-
custom_client->stream_id, custom_client->client_id, &mb,
47-
request_length);
48-
45+
&custom_node->context->session,
46+
custom_client->stream_id, custom_client->client_id, &mb,
47+
request_length);
48+
4949
functions->cdr_serialize(ros_request, &mb);
5050

5151
if (UXR_INVALID_REQUEST_ID == *sequence_id)

rmw_microxrcedds_c/src/rmw_response.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,12 @@ rmw_send_response(
5252
(const message_type_support_callbacks_t*)res_members->data;
5353

5454
ucdrBuffer mb;
55-
uint32_t response_length = functions->get_serialized_size(ros_response) + 24; // Adding sample indentity size
56-
uint16_t rc = uxr_prepare_output_stream(
57-
&custom_node->context->session,
58-
custom_service->stream_id, custom_service->service_id, &mb,
59-
response_length);
60-
55+
uint32_t response_length = functions->get_serialized_size(ros_response) + 24; // Adding sample indentity size
56+
uint16_t rc = uxr_prepare_output_stream(
57+
&custom_node->context->session,
58+
custom_service->stream_id, custom_service->service_id, &mb,
59+
response_length);
60+
6161
uxr_serialize_SampleIdentity(&mb, &sample_id);
6262
functions->cdr_serialize(ros_response, &mb);
6363

rmw_microxrcedds_c/src/rmw_service.c

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ rmw_create_service(
3535
const rmw_qos_profile_t* qos_policies)
3636
{
3737
EPROS_PRINT_TRACE()
38-
rmw_service_t * rmw_service = NULL;
38+
rmw_service_t* rmw_service = NULL;
3939
if (!node)
4040
{
4141
RMW_SET_ERROR_MSG("node handle is null");
@@ -142,12 +142,12 @@ rmw_create_service(
142142
}
143143
service_req = uxr_buffer_create_replier_xml(
144144
&custom_node->context->session,
145-
custom_node->context->reliable_output, custom_service->service_id,
145+
*custom_node->context->creation_destroy_stream, custom_service->service_id,
146146
custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE);
147147
#elif defined(RMW_UXRCE_TRANSPORT_USE_REFS)
148148
// CHECK IF THIS IS NECESSARY
149149
// service_req = uxr_buffer_create_replier_ref(&custom_node->context->session,
150-
// custom_node->context->reliable_output, custom_service->subscriber_id,
150+
// *custom_node->context->creation_destroy_stream, custom_service->subscriber_id,
151151
// custom_node->participant_id, "", UXR_REPLACE);
152152
#endif
153153

@@ -167,14 +167,19 @@ rmw_create_service(
167167
delivery_control.max_bytes_per_second = UXR_MAX_BYTES_PER_SECOND_UNLIMITED;
168168

169169
custom_service->stream_id =
170+
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
171+
custom_node->context->best_effort_output :
172+
custom_node->context->reliable_output;
173+
174+
uxrStreamId data_request_stream_id =
170175
(qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ?
171176
custom_node->context->best_effort_input :
172177
custom_node->context->reliable_input;
173178

174-
custom_service->request_id = uxr_buffer_request_data(
179+
custom_service->service_data_resquest = uxr_buffer_request_data(
175180
&custom_node->context->session,
176-
custom_node->context->reliable_output, custom_service->service_id,
177-
custom_service->stream_id, &delivery_control);
181+
*custom_node->context->creation_destroy_stream, custom_service->service_id,
182+
data_request_stream_id, &delivery_control);
178183
}
179184
return(rmw_service);
180185

0 commit comments

Comments
 (0)