diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1940e93d..26a1d0ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -19,12 +19,12 @@ jobs: - name: Download dependencies run: | git clone -b foxy https://github.com/eProsima/Micro-CDR src/Micro-CDR - git clone -b foxy https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client + git clone -b feature/brokerless_p2p https://github.com/eProsima/Micro-XRCE-DDS-Client src/Micro-XRCE-DDS-Client git clone -b foxy https://github.com/micro-ROS/rosidl_typesupport_microxrcedds src/rosidl_typesupport_microxrcedds touch src/rosidl_typesupport_microxrcedds/test/COLCON_IGNORE - name: Build - run: . /opt/ros/foxy/setup.sh && colcon build + run: . /opt/ros/foxy/setup.sh && colcon build --cmake-args -DUCLIENT_MAX_SESSION_CONNECTION_ATTEMPTS=1 - name: Test run: | diff --git a/rmw_microxrcedds_c/src/callbacks.c b/rmw_microxrcedds_c/src/callbacks.c index c357531c..d7750f16 100644 --- a/rmw_microxrcedds_c/src/callbacks.c +++ b/rmw_microxrcedds_c/src/callbacks.c @@ -49,7 +49,7 @@ void on_topic( if ((custom_subscription->datareader_id.id == object_id.id) && (custom_subscription->datareader_id.type == object_id.type)) { - custom_subscription->micro_buffer_lenght[custom_subscription->history_write_index] = length; + custom_subscription->micro_buffer_length[custom_subscription->history_write_index] = length; ucdr_deserialize_array_uint8_t( ub, custom_subscription->micro_buffer[custom_subscription->history_write_index], length); @@ -89,7 +89,7 @@ void on_request( while (service_item != NULL) { rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service_item->data; if (custom_service->request_id == request_id) { - custom_service->micro_buffer_lenght[custom_service->history_write_index] = length; + custom_service->micro_buffer_length[custom_service->history_write_index] = length; ucdr_deserialize_array_uint8_t( ub, custom_service->micro_buffer[custom_service->history_write_index], length); @@ -132,7 +132,7 @@ void on_reply( while (client_item != NULL) { rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client_item->data; if (custom_client->request_id == request_id) { - custom_client->micro_buffer_lenght[custom_client->history_write_index] = length; + custom_client->micro_buffer_length[custom_client->history_write_index] = length; ucdr_deserialize_array_uint8_t( ub, custom_client->micro_buffer[custom_client->history_write_index], length); diff --git a/rmw_microxrcedds_c/src/memory.c b/rmw_microxrcedds_c/src/memory.c index b36789e8..c92f35d9 100644 --- a/rmw_microxrcedds_c/src/memory.c +++ b/rmw_microxrcedds_c/src/memory.c @@ -91,6 +91,18 @@ struct rmw_uxrce_mempool_item_t * get_memory(struct rmw_uxrce_mempool_t * mem) return item; } +bool check_already_freed(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item){ + + struct rmw_uxrce_mempool_item_t * i = mem->freeitems; + while (i != NULL) { + if (i == item){ + return true; + } + i = i->next; + } + return false; +} + void put_memory(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item) { // Gets item from allocated pool diff --git a/rmw_microxrcedds_c/src/memory.h b/rmw_microxrcedds_c/src/memory.h index 808e1227..1a0c335c 100644 --- a/rmw_microxrcedds_c/src/memory.h +++ b/rmw_microxrcedds_c/src/memory.h @@ -42,6 +42,7 @@ void set_mem_pool(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_ite bool has_memory(struct rmw_uxrce_mempool_t * mem); struct rmw_uxrce_mempool_item_t * get_memory(struct rmw_uxrce_mempool_t * mem); void put_memory(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item); +bool check_already_freed(struct rmw_uxrce_mempool_t * mem, struct rmw_uxrce_mempool_item_t * item); void free_mem_pool(struct rmw_uxrce_mempool_t * mem); #endif // MEMORY_H_ diff --git a/rmw_microxrcedds_c/src/rmw_client.c b/rmw_microxrcedds_c/src/rmw_client.c index 6352d65b..1ab5376d 100644 --- a/rmw_microxrcedds_c/src/rmw_client.c +++ b/rmw_microxrcedds_c/src/rmw_client.c @@ -127,13 +127,24 @@ rmw_create_client( } client_req = uxr_buffer_create_requester_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_client->client_id, - custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_client->client_id, + custom_node->participant_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) - // TODO(pablogs9): Is possible to instantiate a replier by ref? - // client_req = uxr_buffer_create_replier_ref(&custom_node->context->session, - // custom_node->context->reliable_output, custom_service->subscriber_id, - // custom_node->participant_id, "", UXR_REPLACE); + if (!build_requester_profile(service_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { + RMW_SET_ERROR_MSG("failed to generate ref request for client creation"); + goto fail; + } + + client_req = uxr_buffer_create_requester_ref( + &custom_node->context->session, + *custom_node->context->entity_creation_output, + custom_client->client_id, + custom_node->participant_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif rmw_client->data = custom_client; @@ -144,7 +155,7 @@ rmw_create_client( &custom_node->context->session, 1000, requests, status, 1)) { - RMW_SET_ERROR_MSG("Issues creating Micro XRCE-DDS entities"); + RMW_SET_ERROR_MSG("Issues creating Requester Micro XRCE-DDS entities"); put_memory(&client_memory, &custom_client->mem); goto fail; } @@ -162,8 +173,10 @@ rmw_create_client( custom_client->request_id = uxr_buffer_request_data( &custom_node->context->session, - custom_node->context->reliable_output, custom_client->client_id, - custom_client->stream_id, &delivery_control); + *custom_node->context->entity_creation_output, + custom_client->client_id, + custom_client->stream_id, + &delivery_control); } return rmw_client; @@ -203,7 +216,8 @@ rmw_destroy_client( rmw_uxrce_client_t * custom_client = (rmw_uxrce_client_t *)client->data; uint16_t delete_client = uxr_buffer_delete_entity( - &custom_node->context->session, custom_node->context->reliable_output, + &custom_node->context->session, + *custom_node->context->entity_creation_output, custom_client->client_id); uint16_t requests[] = {delete_client}; @@ -212,7 +226,7 @@ rmw_destroy_client( &custom_node->context->session, 1000, requests, status, sizeof(status))) { - RMW_SET_ERROR_MSG("unable to remove client from the server"); + RMW_SET_ERROR_MSG("Unable to remove client from the server"); result_ret = RMW_RET_ERROR; } else { rmw_uxrce_fini_client_memory(client); diff --git a/rmw_microxrcedds_c/src/rmw_init.c b/rmw_microxrcedds_c/src/rmw_init.c index 0d924a89..37cd43fb 100644 --- a/rmw_microxrcedds_c/src/rmw_init.c +++ b/rmw_microxrcedds_c/src/rmw_init.c @@ -33,9 +33,13 @@ #include #endif +#ifdef UCLIENT_PROFILE_BROKERLESS +#include +#endif + #if defined(MICRO_XRCEDDS_SERIAL) || defined(MICRO_XRCEDDS_CUSTOM_SERIAL) #define CLOSE_TRANSPORT(transport) uxr_close_serial_transport(transport) -#elif defined(MICRO_XRCEDDS_UDP) +#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_PROFILE_BROKERLESS) #define CLOSE_TRANSPORT(transport) uxr_close_udp_transport(transport) #else #define CLOSE_TRANSPORT(transport) @@ -245,8 +249,7 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) } printf("Serial mode => dev: %s\n", context_impl->connection_params.serial_device); -#elif defined(MICRO_XRCEDDS_UDP) - // TODO(Borja) Think how we are going to select transport to use +#elif defined(MICRO_XRCEDDS_UDP) && !defined(UCLIENT_PROFILE_BROKERLESS) #ifdef MICRO_XRCEDDS_IPV4 uxrIpProtocol ip_protocol = UXR_IPv4; #elif defined(MICRO_XRCEDDS_IPV6) @@ -263,6 +266,11 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) printf( "UDP mode => ip: %s - port: %s\n", context_impl->connection_params.agent_address, context_impl->connection_params.agent_port); +#elif defined(MICRO_XRCEDDS_UDP) && defined(UCLIENT_PROFILE_BROKERLESS) + + context_impl->transport.comm = brokerless_comm_stub; + + printf("UDP mode => Brokerless P2P\n"); #elif defined(MICRO_XRCEDDS_CUSTOM_SERIAL) int pseudo_fd = 0; if (strlen(options->impl->connection_params.serial_device) > 0) { @@ -289,23 +297,26 @@ rmw_init(const rmw_init_options_t * options, rmw_context_t * context) context_impl->reliable_input = uxr_create_input_reliable_stream( &context_impl->session, context_impl->input_reliable_stream_buffer, - context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY); - context_impl->reliable_output = - uxr_create_output_reliable_stream( + RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY); + context_impl->reliable_output = uxr_create_output_reliable_stream( &context_impl->session, context_impl->output_reliable_stream_buffer, - context_impl->transport.comm.mtu * RMW_UXRCE_STREAM_HISTORY, RMW_UXRCE_STREAM_HISTORY); + RMW_UXRCE_MAX_BUFFER_SIZE, RMW_UXRCE_STREAM_HISTORY); context_impl->best_effort_input = uxr_create_input_best_effort_stream(&context_impl->session); context_impl->best_effort_output = uxr_create_output_best_effort_stream( &context_impl->session, - context_impl->output_best_effort_stream_buffer, context_impl->transport.comm.mtu); - + context_impl->output_best_effort_stream_buffer, RMW_UXRCE_MAX_TRANSPORT_MTU); +#ifndef UCLIENT_PROFILE_BROKERLESS + context_impl->entity_creation_output = &context_impl->reliable_output; if (!uxr_create_session(&context_impl->session)) { CLOSE_TRANSPORT(&context_impl->transport); RMW_SET_ERROR_MSG("failed to create node session on Micro ROS Agent."); return RMW_RET_ERROR; } +#else + context_impl->entity_creation_output = &context_impl->best_effort_output; +#endif return RMW_RET_OK; } diff --git a/rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c b/rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c index a7099aa1..d1d41726 100644 --- a/rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c +++ b/rmw_microxrcedds_c/src/rmw_microxrcedds_topic.c @@ -21,7 +21,6 @@ #include "./utils.h" - rmw_uxrce_topic_t * create_topic( struct rmw_uxrce_node_t * custom_node, @@ -34,7 +33,6 @@ create_topic( RMW_SET_ERROR_MSG("Not available memory node"); goto fail; } - rmw_uxrce_topic_t * custom_topic = (rmw_uxrce_topic_t *)memory_node->data; @@ -63,8 +61,11 @@ create_topic( topic_req = uxr_buffer_create_topic_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_topic->topic_id, - custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_topic->topic_id, + custom_node->participant_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) (void)qos_policies; if (!build_topic_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { @@ -76,18 +77,23 @@ create_topic( topic_req = uxr_buffer_create_topic_ref( &custom_node->context->session, - custom_node->context->reliable_output, custom_topic->topic_id, - custom_node->participant_id, rmw_uxrce_profile_name, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_topic->topic_id, + custom_node->participant_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif // Send the request and wait for response uint8_t status; - custom_topic->sync_with_agent = - uxr_run_session_until_all_status( - &custom_node->context->session, 1000, &topic_req, - &status, 1); - if (!custom_topic->sync_with_agent) { - RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities"); + + if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&custom_node->context->session); + } + else if(!(custom_topic->sync_with_agent = uxr_run_session_until_all_status(&custom_node->context->session, 1000, &topic_req, &status, 1))) + { + RMW_SET_ERROR_MSG("Issues creating Topic Micro XRCE-DDS entities"); rmw_uxrce_fini_topic_memory(custom_topic); custom_topic = NULL; goto fail; @@ -101,18 +107,30 @@ rmw_ret_t destroy_topic(rmw_uxrce_topic_t * topic) { rmw_ret_t result_ret = RMW_RET_OK; + // TODO (pablogs9): Implement this in all destroy functions; + if (check_already_freed(&topics_memory, &topic->mem)) + { + return RMW_RET_ERROR; + } + uint16_t delete_topic = uxr_buffer_delete_entity( &topic->owner_node->context->session, - topic->owner_node->context->reliable_output, + *topic->owner_node->context->entity_creation_output, topic->topic_id); uint16_t requests[] = {delete_topic}; uint8_t status[1]; - if (!uxr_run_session_until_all_status( + + if (UXR_BEST_EFFORT_STREAM == topic->owner_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&topic->owner_node->context->session); + rmw_uxrce_fini_topic_memory(topic); + } + else if (!uxr_run_session_until_all_status( &topic->owner_node->context->session, 1000, requests, status, 1)) { - RMW_SET_ERROR_MSG("unable to remove topic from the server"); + RMW_SET_ERROR_MSG("Unable to remove topic from the server"); result_ret = RMW_RET_ERROR; } else { rmw_uxrce_fini_topic_memory(topic); diff --git a/rmw_microxrcedds_c/src/rmw_node.c b/rmw_microxrcedds_c/src/rmw_node.c index 03b52705..4ec48067 100644 --- a/rmw_microxrcedds_c/src/rmw_node.c +++ b/rmw_microxrcedds_c/src/rmw_node.c @@ -85,8 +85,11 @@ rmw_node_t * create_node( participant_req = uxr_buffer_create_participant_xml( &node_info->context->session, - node_info->context->reliable_output, - node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *node_info->context->entity_creation_output, + node_info->participant_id, + (uint16_t)domain_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) if (!build_participant_profile(rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { RMW_SET_ERROR_MSG("failed to generate xml request for node creation"); @@ -95,15 +98,24 @@ rmw_node_t * create_node( participant_req = uxr_buffer_create_participant_ref( &node_info->context->session, - node_info->context->reliable_output, - node_info->participant_id, (uint16_t)domain_id, rmw_uxrce_profile_name, UXR_REPLACE); + *custom_node->context->entity_creation_output, + node_info->participant_id, + (uint16_t)domain_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif + uint8_t status[1]; uint16_t requests[] = {participant_req}; - if (!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1)) { + if (UXR_BEST_EFFORT_STREAM == node_info->context->entity_creation_output->type) + { + uxr_flash_output_streams(&node_info->context->session); + } + else if(!uxr_run_session_until_all_status(&node_info->context->session, 1000, requests, status, 1)) + { rmw_uxrce_fini_node_memory(node_handle); - RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities"); + RMW_SET_ERROR_MSG("Issues creating Participant Micro XRCE-DDS entities"); return NULL; } @@ -201,14 +213,16 @@ rmw_ret_t rmw_destroy_node(rmw_node_t * node) uint16_t participant_req = uxr_buffer_delete_entity( &custom_node->context->session, - custom_node->context->reliable_output, + *custom_node->context->entity_creation_output, custom_node->participant_id); uint8_t status[1]; uint16_t requests[] = {participant_req}; - if (!uxr_run_session_until_all_status( - &custom_node->context->session, 1000, requests, status, - 1)) + if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&custom_node->context->session); + } + else if(!uxr_run_session_until_all_status(&custom_node->context->session, 1000, requests, status,1)) { ret = RMW_RET_ERROR; } diff --git a/rmw_microxrcedds_c/src/rmw_publish.c b/rmw_microxrcedds_c/src/rmw_publish.c index 429c5e04..16083ea4 100644 --- a/rmw_microxrcedds_c/src/rmw_publish.c +++ b/rmw_microxrcedds_c/src/rmw_publish.c @@ -54,7 +54,7 @@ rmw_publish( { written = functions->cdr_serialize(ros_message, &mb); - if (UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) { + if (written && UXR_BEST_EFFORT_STREAM == custom_publisher->stream_id.type) { uxr_flash_output_streams(&custom_publisher->owner_node->context->session); } else { written &= uxr_run_session_until_confirm_delivery( diff --git a/rmw_microxrcedds_c/src/rmw_publisher.c b/rmw_microxrcedds_c/src/rmw_publisher.c index 7b73ee2a..0f4e39d6 100644 --- a/rmw_microxrcedds_c/src/rmw_publisher.c +++ b/rmw_microxrcedds_c/src/rmw_publisher.c @@ -99,9 +99,9 @@ rmw_create_publisher( memcpy(&custom_publisher->qos, qos_policies, sizeof(rmw_qos_profile_t)); custom_publisher->stream_id = - (qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) ? - custom_node->context->best_effort_input : - custom_node->context->reliable_input; + (qos_policies->reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) || (custom_node->context->entity_creation_output->type == UXR_BEST_EFFORT_STREAM) ? + custom_node->context->best_effort_output : + custom_node->context->reliable_output; const rosidl_message_type_support_t * type_support_xrce = NULL; #ifdef ROSIDL_TYPESUPPORT_MICROXRCEDDS_C__IDENTIFIER_VALUE @@ -156,15 +156,17 @@ rmw_create_publisher( } publisher_req = uxr_buffer_create_publisher_xml( &custom_publisher->owner_node->context->session, - custom_node->context->reliable_output, + *custom_node->context->entity_creation_output, custom_publisher->publisher_id, custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) publisher_req = uxr_buffer_create_publisher_xml( &custom_publisher->owner_node->context->session, - custom_node->context->reliable_output, + *custom_node->context->entity_creation_output, custom_publisher->publisher_id, - custom_node->participant_id, "", UXR_REPLACE); + custom_node->participant_id, + "", + UXR_REPLACE); #endif custom_publisher->datawriter_id = uxr_object_id( @@ -183,31 +185,40 @@ rmw_create_publisher( datawriter_req = uxr_buffer_create_datawriter_xml( &custom_publisher->owner_node->context->session, - custom_node->context->reliable_output, + *custom_node->context->entity_creation_output, custom_publisher->datawriter_id, - custom_publisher->publisher_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + custom_publisher->publisher_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) if (!build_datawriter_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { - RMW_SET_ERROR_MSG("failed to generate xml request for node creation"); + RMW_SET_ERROR_MSG("failed to generate ref request datawriter creation"); goto fail; } datawriter_req = uxr_buffer_create_datawriter_ref( &custom_publisher->owner_node->context->session, - custom_node->context->reliable_output, + *custom_node->context->entity_creation_output, custom_publisher->datawriter_id, - custom_publisher->publisher_id, rmw_uxrce_profile_name, UXR_REPLACE); + custom_publisher->publisher_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif rmw_publisher->data = custom_publisher; uint16_t requests[] = {publisher_req, datawriter_req}; uint8_t status[sizeof(requests) / 2]; - if (!uxr_run_session_until_all_status( - &custom_publisher->owner_node->context->session, 1000, requests, - status, sizeof(status))) + + if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&custom_node->context->session); + } + else if(!uxr_run_session_until_all_status( + &custom_publisher->owner_node->context->session, 1000, requests, + status, sizeof(status))) { - RMW_SET_ERROR_MSG("Issues creating micro XRCE-DDS entities"); + RMW_SET_ERROR_MSG("Issues creating Publisher Micro XRCE-DDS entities"); put_memory(&publisher_memory, &custom_publisher->mem); goto fail; } @@ -311,25 +322,37 @@ rmw_destroy_publisher( uint16_t delete_writer = uxr_buffer_delete_entity( &custom_publisher->owner_node->context->session, - custom_publisher->owner_node->context->reliable_output, + *custom_publisher->owner_node->context->entity_creation_output, custom_publisher->datawriter_id); uint16_t delete_publisher = uxr_buffer_delete_entity( &custom_publisher->owner_node->context->session, - custom_publisher->owner_node->context->reliable_output, + *custom_publisher->owner_node->context->entity_creation_output, custom_publisher->publisher_id); uint16_t requests[] = {delete_writer, delete_publisher}; uint8_t status[sizeof(requests) / 2]; - if (!uxr_run_session_until_all_status( - &custom_publisher->owner_node->context->session, 1000, requests, status, - sizeof(status))) + + if (UXR_BEST_EFFORT_STREAM == custom_publisher->owner_node->context->entity_creation_output->type) { - RMW_SET_ERROR_MSG("unable to remove publisher from the server"); - result_ret = RMW_RET_ERROR; - } else { + uxr_flash_output_streams(&custom_publisher->owner_node->context->session); rmw_uxrce_fini_publisher_memory(publisher); result_ret = RMW_RET_OK; } + else + { + if (!uxr_run_session_until_all_status( + &custom_publisher->owner_node->context->session, 1000, requests, status, + sizeof(status))) + { + RMW_SET_ERROR_MSG("Unable to remove publisher from the server"); + result_ret = RMW_RET_ERROR; + } + else + { + rmw_uxrce_fini_publisher_memory(publisher); + result_ret = RMW_RET_OK; + } + } } return result_ret; diff --git a/rmw_microxrcedds_c/src/rmw_request.c b/rmw_microxrcedds_c/src/rmw_request.c index 4db47198..3e55c829 100644 --- a/rmw_microxrcedds_c/src/rmw_request.c +++ b/rmw_microxrcedds_c/src/rmw_request.c @@ -110,7 +110,7 @@ rmw_take_request( ucdrBuffer temp_buffer; ucdr_init_buffer( &temp_buffer, custom_service->micro_buffer[custom_service->history_read_index], - custom_service->micro_buffer_lenght[custom_service->history_read_index]); + custom_service->micro_buffer_length[custom_service->history_read_index]); bool deserialize_rv = functions->cdr_deserialize(&temp_buffer, ros_request); diff --git a/rmw_microxrcedds_c/src/rmw_response.c b/rmw_microxrcedds_c/src/rmw_response.c index bf0fe375..c88fa580 100644 --- a/rmw_microxrcedds_c/src/rmw_response.c +++ b/rmw_microxrcedds_c/src/rmw_response.c @@ -53,13 +53,13 @@ rmw_send_response( uint32_t topic_size = functions->get_serialized_size(ros_response); ucdrBuffer reply_ub; - ucdr_init_buffer(&reply_ub, custom_service->replay_buffer, sizeof(custom_service->replay_buffer)); + ucdr_init_buffer(&reply_ub, custom_service->reply_buffer, sizeof(custom_service->reply_buffer)); functions->cdr_serialize(ros_response, &reply_ub); uxr_buffer_reply( &custom_node->context->session, custom_service->stream_id, - custom_service->service_id, &sample_id, custom_service->replay_buffer, topic_size); + custom_service->service_id, &sample_id, custom_service->reply_buffer, topic_size); return RMW_RET_OK; } @@ -100,7 +100,7 @@ rmw_take_response( ucdrBuffer temp_buffer; ucdr_init_buffer( &temp_buffer, custom_client->micro_buffer[custom_client->history_read_index], - custom_client->micro_buffer_lenght[custom_client->history_read_index]); + custom_client->micro_buffer_length[custom_client->history_read_index]); bool deserialize_rv = functions->cdr_deserialize( &temp_buffer, diff --git a/rmw_microxrcedds_c/src/rmw_service.c b/rmw_microxrcedds_c/src/rmw_service.c index dba3ad1a..9df0390d 100644 --- a/rmw_microxrcedds_c/src/rmw_service.c +++ b/rmw_microxrcedds_c/src/rmw_service.c @@ -124,13 +124,24 @@ rmw_create_service( } service_req = uxr_buffer_create_replier_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_service->service_id, - custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_service->service_id, + custom_node->participant_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) - // CHECK IF THIS IS NECESSARY - // service_req = uxr_buffer_create_replier_ref(&custom_node->context->session, - // custom_node->context->reliable_output, custom_service->subscriber_id, - // custom_node->participant_id, "", UXR_REPLACE); + if (!build_replier_profile(service_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { + RMW_SET_ERROR_MSG("failed to generate ref request for service creation"); + goto fail; + } + + service_req = uxr_buffer_create_replier_ref( + &custom_node->context->session, + *custom_node->context->entity_creation_output, + custom_client->client_id, + custom_node->participant_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif rmw_service->data = custom_service; @@ -141,7 +152,7 @@ rmw_create_service( &custom_node->context->session, 1000, requests, status, 1)) { - RMW_SET_ERROR_MSG("Issues creating Micro XRCE-DDS entities"); + RMW_SET_ERROR_MSG("Issues creating Replier Micro XRCE-DDS entities"); put_memory(&service_memory, &custom_service->mem); goto fail; } @@ -159,7 +170,8 @@ rmw_create_service( custom_service->request_id = uxr_buffer_request_data( &custom_node->context->session, - custom_node->context->reliable_output, custom_service->service_id, + *custom_node->context->entity_creation_output, + custom_service->service_id, custom_service->stream_id, &delivery_control); } return rmw_service; @@ -200,7 +212,8 @@ rmw_destroy_service( rmw_uxrce_service_t * custom_service = (rmw_uxrce_service_t *)service->data; uint16_t delete_service = uxr_buffer_delete_entity( - &custom_node->context->session, custom_node->context->reliable_output, + &custom_node->context->session, + *custom_node->context->entity_creation_output, custom_service->service_id); uint16_t requests[] = {delete_service}; @@ -209,7 +222,7 @@ rmw_destroy_service( &custom_node->context->session, 1000, requests, status, sizeof(status))) { - RMW_SET_ERROR_MSG("unable to remove service from the server"); + RMW_SET_ERROR_MSG("Unable to remove service from the server"); result_ret = RMW_RET_ERROR; } else { rmw_uxrce_fini_service_memory(service); diff --git a/rmw_microxrcedds_c/src/rmw_subscription.c b/rmw_microxrcedds_c/src/rmw_subscription.c index 3a436bf6..33b65580 100644 --- a/rmw_microxrcedds_c/src/rmw_subscription.c +++ b/rmw_microxrcedds_c/src/rmw_subscription.c @@ -155,15 +155,19 @@ rmw_create_subscription( } subscriber_req = uxr_buffer_create_subscriber_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_subscription->subscriber_id, - custom_node->participant_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_subscription->subscriber_id, + custom_node->participant_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) - // TODO(BORJA) Publisher by reference does not make sense in - // current micro XRCE-DDS implementation. subscriber_req = uxr_buffer_create_subscriber_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_subscription->subscriber_id, - custom_node->participant_id, "", UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_subscription->subscriber_id, + custom_node->participant_id, + "", + UXR_REPLACE); #endif @@ -184,29 +188,38 @@ rmw_create_subscription( datareader_req = uxr_buffer_create_datareader_xml( &custom_node->context->session, - custom_node->context->reliable_output, custom_subscription->datareader_id, - custom_subscription->subscriber_id, rmw_uxrce_xml_buffer, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_subscription->datareader_id, + custom_subscription->subscriber_id, + rmw_uxrce_xml_buffer, + UXR_REPLACE); #elif defined(MICRO_XRCEDDS_USE_REFS) if (!build_datareader_profile(topic_name, rmw_uxrce_profile_name, sizeof(rmw_uxrce_profile_name))) { - RMW_SET_ERROR_MSG("failed to generate xml request for node creation"); + RMW_SET_ERROR_MSG("failed to generate ref request for datareader creation"); goto fail; } datareader_req = uxr_buffer_create_datareader_ref( &custom_node->context->session, - custom_node->context->reliable_output, custom_subscription->datareader_id, - custom_subscription->subscriber_id, rmw_uxrce_profile_name, UXR_REPLACE); + *custom_node->context->entity_creation_output, + custom_subscription->datareader_id, + custom_subscription->subscriber_id, + rmw_uxrce_profile_name, + UXR_REPLACE); #endif rmw_subscription->data = custom_subscription; uint16_t requests[] = {subscriber_req, datareader_req}; uint8_t status[sizeof(requests) / 2]; - if (!uxr_run_session_until_all_status( - &custom_node->context->session, 1000, requests, - status, sizeof(status))) + + if (UXR_BEST_EFFORT_STREAM == custom_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&custom_node->context->session); + } + else if(!uxr_run_session_until_all_status(&custom_node->context->session, 1000, requests, status,sizeof(status))) { - RMW_SET_ERROR_MSG("Issues creating Micro XRCE-DDS entities"); + RMW_SET_ERROR_MSG("Issues creating Subscriber Micro XRCE-DDS entities"); put_memory(&subscription_memory, &custom_subscription->mem); goto fail; } @@ -224,7 +237,8 @@ rmw_create_subscription( custom_subscription->subscription_request = uxr_buffer_request_data( &custom_node->context->session, - custom_node->context->reliable_output, custom_subscription->datareader_id, + *custom_node->context->entity_creation_output, + custom_subscription->datareader_id, custom_subscription->stream_id, &delivery_control); } return rmw_subscription; @@ -290,21 +304,27 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) uint16_t delete_datareader = uxr_buffer_delete_entity( &custom_subscription->owner_node->context->session, - custom_subscription->owner_node->context->reliable_output, + *custom_subscription->owner_node->context->entity_creation_output, custom_subscription->datareader_id); uint16_t delete_subscriber = uxr_buffer_delete_entity( &custom_subscription->owner_node->context->session, - custom_subscription->owner_node->context->reliable_output, + *custom_subscription->owner_node->context->entity_creation_output, custom_subscription->subscriber_id); uint16_t requests[] = {delete_datareader, delete_subscriber}; uint8_t status[sizeof(requests) / 2]; - if (!uxr_run_session_until_all_status( + + if (UXR_BEST_EFFORT_STREAM == custom_subscription->owner_node->context->entity_creation_output->type) + { + uxr_flash_output_streams(&custom_subscription->owner_node->context->session); + rmw_uxrce_fini_subscription_memory(subscription); + } + else if (!uxr_run_session_until_all_status( &custom_subscription->owner_node->context->session, 1000, requests, status, sizeof(status))) { - RMW_SET_ERROR_MSG("unable to remove publisher from the server"); + RMW_SET_ERROR_MSG("Unable to remove subscriber from the server"); result_ret = RMW_RET_ERROR; } else { rmw_uxrce_fini_subscription_memory(subscription); diff --git a/rmw_microxrcedds_c/src/rmw_take.c b/rmw_microxrcedds_c/src/rmw_take.c index c06829a2..efde5c08 100644 --- a/rmw_microxrcedds_c/src/rmw_take.c +++ b/rmw_microxrcedds_c/src/rmw_take.c @@ -58,7 +58,7 @@ rmw_take_with_info( ucdrBuffer temp_buffer; ucdr_init_buffer( &temp_buffer, custom_subscription->micro_buffer[custom_subscription->history_read_index], - custom_subscription->micro_buffer_lenght[custom_subscription->history_read_index]); + custom_subscription->micro_buffer_length[custom_subscription->history_read_index]); bool deserialize_rv = custom_subscription->type_support_callbacks->cdr_deserialize( &temp_buffer, diff --git a/rmw_microxrcedds_c/src/types.h b/rmw_microxrcedds_c/src/types.h index c53140d0..27a6eab1 100644 --- a/rmw_microxrcedds_c/src/types.h +++ b/rmw_microxrcedds_c/src/types.h @@ -70,6 +70,8 @@ struct rmw_context_impl_t uxrStreamId best_effort_output; uxrStreamId best_effort_input; + uxrStreamId * entity_creation_output; + uint8_t input_reliable_stream_buffer[RMW_UXRCE_MAX_BUFFER_SIZE]; uint8_t output_reliable_stream_buffer[RMW_UXRCE_MAX_BUFFER_SIZE]; uint8_t output_best_effort_stream_buffer[RMW_UXRCE_MAX_TRANSPORT_MTU]; @@ -113,7 +115,7 @@ typedef struct rmw_uxrce_service_t SampleIdentity sample_id[RMW_UXRCE_MAX_HISTORY]; uint8_t micro_buffer[RMW_UXRCE_MAX_HISTORY][RMW_UXRCE_MAX_BUFFER_SIZE]; - size_t micro_buffer_lenght[RMW_UXRCE_MAX_HISTORY]; + size_t micro_buffer_length[RMW_UXRCE_MAX_HISTORY]; uint8_t history_write_index; uint8_t history_read_index; @@ -121,7 +123,7 @@ typedef struct rmw_uxrce_service_t uxrStreamId stream_id; - uint8_t replay_buffer[RMW_UXRCE_MAX_TRANSPORT_MTU]; + uint8_t reply_buffer[RMW_UXRCE_MAX_TRANSPORT_MTU]; struct rmw_uxrce_node_t * owner_node; } rmw_uxrce_service_t; @@ -137,7 +139,7 @@ typedef struct rmw_uxrce_client_t int64_t reply_id[RMW_UXRCE_MAX_HISTORY]; uint8_t micro_buffer[RMW_UXRCE_MAX_HISTORY][RMW_UXRCE_MAX_BUFFER_SIZE]; - size_t micro_buffer_lenght[RMW_UXRCE_MAX_HISTORY]; + size_t micro_buffer_length[RMW_UXRCE_MAX_HISTORY]; uint8_t history_write_index; uint8_t history_read_index; @@ -160,7 +162,7 @@ typedef struct rmw_uxrce_subscription_t const message_type_support_callbacks_t * type_support_callbacks; uint8_t micro_buffer[RMW_UXRCE_MAX_HISTORY][RMW_UXRCE_MAX_BUFFER_SIZE]; - size_t micro_buffer_lenght[RMW_UXRCE_MAX_HISTORY]; + size_t micro_buffer_length[RMW_UXRCE_MAX_HISTORY]; uint8_t history_write_index; uint8_t history_read_index; diff --git a/rmw_microxrcedds_c/src/utils.c b/rmw_microxrcedds_c/src/utils.c index 7fd1ba8d..337f5761 100644 --- a/rmw_microxrcedds_c/src/utils.c +++ b/rmw_microxrcedds_c/src/utils.c @@ -374,4 +374,23 @@ bool is_uxrce_rmw_identifier_valid(const char * id) { return id != NULL && strcmp(id, rmw_get_implementation_identifier()) == 0; -} \ No newline at end of file +} +bool build_requester_profile(const char * service_name, char profile_name[], size_t buffer_size) +{ + const char * const format = "%s___requester"; + service_name++; + bool ret = false; + int written = snprintf(profile_name, buffer_size, format, service_name); + ret = (written > 0) && (written < (int)buffer_size); + return ret; +} + +bool build_replier_profile(const char * service_name, char profile_name[], size_t buffer_size) +{ + const char * const format = "%s___replier"; + service_name++; + bool ret = false; + int written = snprintf(profile_name, buffer_size, format, service_name); + ret = (written > 0) && (written < (int)buffer_size); + return ret; +} diff --git a/rmw_microxrcedds_c/test/test_rmw.cpp b/rmw_microxrcedds_c/test/test_rmw.cpp index ab435b13..5c4aef75 100644 --- a/rmw_microxrcedds_c/test/test_rmw.cpp +++ b/rmw_microxrcedds_c/test/test_rmw.cpp @@ -20,6 +20,8 @@ #include +#include + /* Testing rmw init and shutdown. htps://github.com/microROS/rmw-microxrcedds/issues/14 */ @@ -35,10 +37,14 @@ TEST(rmw_microxrcedds, init_shutdown) { Testing rmw agent autodiscovery. */ TEST(rmw_microxrcedds, autodiscover) { + +#ifndef UCLIENT_PROFILE_BROKERLESS rmw_context_t test_context = rmw_get_zero_initialized_context(); rmw_init_options_t test_options = rmw_get_zero_initialized_init_options(); ASSERT_EQ(rmw_init_options_init(&test_options, rcutils_get_default_allocator()), RMW_RET_OK); ASSERT_EQ(rmw_uros_discover_agent(&test_options), RMW_RET_OK); ASSERT_EQ(rmw_init(&test_options, &test_context), RMW_RET_OK); ASSERT_EQ(rmw_shutdown(&test_context), RMW_RET_OK); +#endif + }