Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pip install asyncio_for_robotics
### Along with Zenoh

```bash
pip install asyncio_for_robotics[zenoh]
pip install asyncio_for_robotics eclipse-zenoh
```

## Read more
Expand Down Expand Up @@ -159,6 +159,19 @@ last_second_average = sum/total
assert last_second_average == pytest.approx(expected_average)
```

### Apply pre-processing to a data-stream

Application:
- Parse payload of different transport into a common type.

```python
# ROS2 String type afor subscriber
inner_sub: Sub[String] = asyncio_for_robotics.ros2.Sub(String, "topic_name")
# converted into a subscriber generating python `str`
ros2str_func = lambda msg: msg.data
sub: Sub[str] = ConverterSub(sub=inner_sub, convert_func=ros2str_func)
```

## About Speed

The inevitable question: *“But isn’t this slower than the ROS 2 executor? ROS 2 is the best!”*
Expand Down
3 changes: 2 additions & 1 deletion asyncio_for_robotics/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from .core.utils import soft_timeout, soft_wait_for, Rate
from .core.sub import BaseSub
from .core.sub import BaseSub, ConverterSub

__all__ = [
"soft_wait_for",
"soft_timeout",
"Rate",
"BaseSub",
"ConverterSub",
]
3 changes: 2 additions & 1 deletion asyncio_for_robotics/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from .utils import soft_timeout, soft_wait_for, Rate
from .sub import BaseSub
from .sub import BaseSub, ConverterSub

__all__ = [
"BaseSub",
"ConverterSub",
"soft_wait_for",
"soft_timeout",
"Rate",
Expand Down
129 changes: 112 additions & 17 deletions asyncio_for_robotics/core/sub.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import asyncio
import logging
from abc import ABC, abstractmethod
from asyncio.queues import Queue
from collections import deque
from types import CoroutineType
from contextlib import suppress
from typing import (
Any,
AsyncGenerator,
Expand All @@ -21,10 +19,16 @@

logger = logging.getLogger(__name__)


class SubClosedException(RuntimeError):
"""Raised when awaiting data from a closed subscriber."""


_MsgType = TypeVar("_MsgType")
_T = TypeVar("_T")


class BaseSub(Generic[_MsgType], ABC):
class BaseSub(Generic[_MsgType]):
def __init__(
self,
) -> None:
Expand All @@ -39,7 +43,7 @@ def __init__(
implementation.

To implements your own sheduling methods (new type of queue, buffer,
genrator ...), please either:
genrator ...), please either:

- inherit from this class, then overide self._input_data_asyncio
- put a callback inside self.asap_callback
Expand All @@ -60,6 +64,11 @@ def __init__(
self._value_flag: asyncio.Event = asyncio.Event()
#: Lastest message value
self._value: Optional[_MsgType] = None
#: Event when closing the sub
self._closed = asyncio.Event()
self._raise_on_close_exc: Exception = SubClosedException(
f"Subscriber '{self.name}' has been closed"
)
logger.debug("created sub %s", self.name)

@property
Expand Down Expand Up @@ -87,14 +96,13 @@ def input_data(self, data: _MsgType) -> bool:
self._event_loop.call_soon_threadsafe(self._input_data_asyncio, data)
return True


async def wait_for_value(self) -> _MsgType:
"""Latest message.

Returns:
The latest message (if none, awaits the first message)
"""
await self._value_flag.wait()
await self._wait_or_raise_closed(self._value_flag.wait())
assert self._value is not None
return self._value

Expand All @@ -118,9 +126,9 @@ async def func() -> _MsgType:
assert self._value is not None
return self._value

return func()
return self._wait_or_raise_closed(func())

def wait_for_next(self) -> Awaitable[_MsgType]:
def wait_for_next(self) -> Coroutine[Any, Any, _MsgType]:
"""Awaits exactly the next value.

.. Note:
Expand All @@ -144,7 +152,7 @@ async def func() -> _MsgType:
finally:
self._dyncamic_queues.discard(q)

return func()
return self._wait_or_raise_closed(func())

def listen(self, fresh=False) -> AsyncGenerator[_MsgType, None]:
"""Itterates over the newest message.
Expand All @@ -164,7 +172,11 @@ def listen(self, fresh=False) -> AsyncGenerator[_MsgType, None]:
return self.listen_reliable(fresh, 1, False)

def listen_reliable(
self, fresh=False, queue_size: int = 10, lifo=False
self,
fresh=False,
queue_size: int = 10,
lifo=False,
exit_on_close: bool = False,
) -> AsyncGenerator[_MsgType, None]:
"""Itterates over every incomming messages. (does not miss messages)

Expand All @@ -177,6 +189,8 @@ def listen_reliable(
fresh: If false, first yield can be the latest value
queue_size: size of the queue of values
lifo: If True, uses a last in first out queue instead of default fifo.
return_on_close: If True, the async for loop will exit when
`.close()` is called. Else, exception will be raised (default)

Returns:
Async generator itterating over every incomming message.
Expand All @@ -190,19 +204,21 @@ def listen_reliable(
if self._value_flag.is_set() and not fresh:
assert self._value is not None, "impossible if flag set"
q.put_nowait(self._value)
return self._unprimed_listen_reliable(q)
return self._unprimed_listen_reliable(q, exit_on_close)

async def _unprimed_listen_reliable(
self, queue: asyncio.Queue
self, queue: asyncio.Queue, exit_on_close: bool = False
) -> AsyncGenerator[_MsgType, None]:
logger.debug("Reliable listener first iter %s", self.name)
try:
while True:
# logger.debug("Reliable listener waiting data %s", self.name)
msg = await queue.get()
# logger.debug("Reliable listener got data %s", self.name)
msg = await self._wait_or_raise_closed(queue.get())
yield msg
# logger.debug("Reliable listener yielded data %s", self.name)
except Exception as e:
if e == self._raise_on_close_exc and exit_on_close:
return
else:
raise e
finally:
self._dyncamic_queues.discard(queue)
logger.debug("Reliable listener closed %s", self.name)
Expand Down Expand Up @@ -236,3 +252,82 @@ async def _wakeup_new(self):
"""fires new_value_cond"""
async with self.new_value_cond:
self.new_value_cond.notify_all()

async def _wait_or_raise_closed(
self, awaitable: Coroutine[Any, Any, _T] | Awaitable[_T]
) -> _T:
"""Wait for an awaitable or raise exception on subscriber shutdown.

Raises:
SubClosedError: if the subscriber is closed before completion.
Exception: any exception raised by the awaitable.

Returns:
Awaited awaitable
"""
main_task = asyncio.ensure_future(awaitable)
close_task = asyncio.ensure_future(self._closed.wait())
done, pending = await asyncio.wait(
[main_task, close_task],
return_when=asyncio.FIRST_COMPLETED,
)
try:
if close_task in done:
logger.info(f"Terminating task because sub is closed")
raise self._raise_on_close_exc
return await main_task
finally:
for task in pending:
task.cancel()
if len(pending) != 0:
with suppress(asyncio.CancelledError):
await asyncio.wait(pending)

def close(self):
self._closed.set()


_InType = TypeVar("_InType")
_OutType = TypeVar("_OutType")


class ConverterSub(BaseSub[_OutType]):
def __init__(
self,
sub: BaseSub[_InType],
convert_func: Callable[[_InType], _OutType] = lambda x: x,
) -> None:
"""Subscriber that applies a transformation to another subscriber.

This subscriber listens reliably to an upstream ``BaseSub`` and
publishes transformed messages as a new ``BaseSub`` instance. The
original subscriber is left unchanged.

Args:
sub: Upstream subscriber to listen to.
convert_func: Function applied to each incoming message. Identity
by default.
"""
#: Upstream subscriber
self.sub: BaseSub = sub
#: Transformation applied to each received message
self.convert_func = convert_func
super().__init__()
#: Background task running the conversion loop
self._loop_task = asyncio.create_task(self._converter_loop())
#: Optional callback invoked on close (typically upstream close).
self.callback_on_close: Optional[Callable[[], Any]] = None
if hasattr(self.sub, "close"):
if callable(self.sub.close):
self.callback_on_close = self.sub.close

async def _converter_loop(self):
async for msg in self.sub.listen_reliable():
new = self.convert_func(msg)
self._input_data_asyncio(new)

def close(self):
self._loop_task.cancel()
if self.callback_on_close is not None:
self.callback_on_close()
super().close()
3 changes: 2 additions & 1 deletion asyncio_for_robotics/ros2/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ..core.utils import Rate, soft_timeout, soft_wait_for
from .. import ConverterSub, Rate, soft_timeout, soft_wait_for
from .service import Client, Server
from .session import (
GLOBAL_SESSION,
Expand All @@ -12,6 +12,7 @@
from .utils import QOS_DEFAULT, QOS_TRANSIENT, TopicInfo

__all__ = [
"ConverterSub",
"soft_wait_for",
"soft_timeout",
"Rate",
Expand Down
1 change: 1 addition & 0 deletions asyncio_for_robotics/ros2/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ def _differed_header_ready_cbk(self, responder_for_user: Responder):
logger.error(e)

def close(self):
super().close()
with self.session.lock() as node:
if not node.executor.context.ok():
return
Expand Down
1 change: 1 addition & 0 deletions asyncio_for_robotics/ros2/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def callback_for_sub(self, sample: _MsgType):
logger.error(e)

def close(self):
super().close()
with self.session.lock() as node:
if not node.executor.context.ok():
return
Expand Down
3 changes: 2 additions & 1 deletion asyncio_for_robotics/textio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from ..core.utils import Rate, soft_timeout, soft_wait_for
from .. import ConverterSub, Rate, soft_timeout, soft_wait_for
from .sub import Sub, from_proc_stdout

__all__ = [
"from_proc_stdout",
"soft_wait_for",
"soft_timeout",
"Rate",
"ConverterSub",
"Sub",
"from_proc_stdout",
]
11 changes: 3 additions & 8 deletions asyncio_for_robotics/textio/sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ def __init__(
warnings.warn("Windows requires changing the asyncio loop type")
else:
self._event_loop.add_reader(self.stream.fileno(), self._io_update_cbk)
self.is_closed = False
self._close_event = asyncio.Event()

@property
def name(self) -> str:
Expand All @@ -52,7 +50,6 @@ def _win_io_update_cbk(self, line):
healthy = self.input_data(line)
if not healthy:
return
# self._event_loop.call_soon_threadsafe(self.close)

def _io_update_cbk(self):
"""Is called on updates to the IO file."""
Expand All @@ -65,12 +62,10 @@ def _io_update_cbk(self):

def close(self):
"""Closes the file reader (not the file)."""
if self.is_closed:
if self._closed.is_set():
return
logger.debug(f"closing {self.name}")
self.is_closed = True
self._close_event.set()
self._event_loop.remove_reader(self.stream.fileno())
super().close()

def from_proc_stdout(
process: subprocess.Popen[_MsgType],
Expand Down Expand Up @@ -107,7 +102,7 @@ async def close_reader():

async def closed_so_stop_waiting():
nonlocal proc_wait_task
await sub._close_event.wait()
await sub._closed.wait()
logger.debug(f"{sub.name} closed cancelling process monitoring task")
proc_wait_task.cancel()
await proc_wait_task
Expand Down
3 changes: 2 additions & 1 deletion asyncio_for_robotics/zenoh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
auto_session,
)
from .sub import Sub
from ..core.utils import soft_timeout, soft_wait_for, Rate
from .. import soft_timeout, soft_wait_for, Rate, ConverterSub

__all__ = [
"soft_wait_for",
Expand All @@ -12,4 +12,5 @@
"auto_session",
"set_auto_session",
"Sub",
"ConverterSub",
]
Loading
Loading