diff --git a/apprise/plugins/home_assistant.py b/apprise/plugins/home_assistant.py index f04646bd8..b146cb8d9 100644 --- a/apprise/plugins/home_assistant.py +++ b/apprise/plugins/home_assistant.py @@ -27,8 +27,10 @@ # You must generate a "Long-Lived Access Token". This can be done from your # Home Assistant Profile page. - +from itertools import chain from json import dumps +import math +import re from uuid import uuid4 import requests @@ -36,12 +38,41 @@ from ..common import NotifyType from ..locale import gettext_lazy as _ from ..url import PrivacyMode -from ..utils.parse import validate_regex +from ..utils.parse import ( + is_domain_service_target, + parse_bool, + parse_domain_service_targets, + validate_regex, +) from .base import NotifyBase +# This regex matches exactly 8 hex digits, +# a dot, then exactly 64 hex digits. it can also be a JWT +# token in which case it will be 180 characters+ +RE_IS_LONG_LIVED_TOKEN = re.compile( + r"^([0-9a-f]{8}\.[0-9a-f]{64}|[a-z0-9_-]+\.[a-z0-9_-]+\.[a-z0-9_-]+)$", + re.I, +) + +# Define our supported device notification formats: +# - service +# - default domain is always 'notify' if one isn't detected +# - service:target +# - service:target1,target2,target3 +# - domain.service +# - domain.service:target +# - domain.service:target1,target2,target3 +# - - targets can be comma/space separated if more hten one +# - service:target1,target2,target3 + +# Define a persistent entry (used for handling message delivery +PERSISTENT_ENTRY = (None, None, []) + class NotifyHomeAssistant(NotifyBase): - """A wrapper for Home Assistant Notifications.""" + """ + A wrapper for Home Assistant Notifications + """ # The default descriptive name associated with the Notification service_name = "HomeAssistant" @@ -58,17 +89,29 @@ class NotifyHomeAssistant(NotifyBase): # Default to Home Assistant Default Insecure port of 8123 instead of 80 default_insecure_port = 8123 + # The maximum amount of services that can be notified in a single batch + default_batch_size = 10 + + # The default ha notification domain if one isn't detected + default_domain = "notify" + # A URL that takes you to the setup/help of the specific protocol setup_url = "https://appriseit.com/services/homeassistant/" # Define object templates templates = ( "{schema}://{host}/{accesstoken}", + "{schema}://{host}/{accesstoken}/{targets}", "{schema}://{host}:{port}/{accesstoken}", + "{schema}://{host}:{port}/{accesstoken}/{targets}", "{schema}://{user}@{host}/{accesstoken}", + "{schema}://{user}@{host}/{accesstoken}/{targets}", "{schema}://{user}@{host}:{port}/{accesstoken}", + "{schema}://{user}@{host}:{port}/{accesstoken}/{targets}", "{schema}://{user}:{password}@{host}/{accesstoken}", + "{schema}://{user}:{password}@{host}/{accesstoken}/{targets}", "{schema}://{user}:{password}@{host}:{port}/{accesstoken}", + "{schema}://{user}:{password}@{host}:{port}/{accesstoken}/{targets}", ) # Define our template tokens @@ -101,6 +144,15 @@ class NotifyHomeAssistant(NotifyBase): "private": True, "required": True, }, + "target_device": { + "name": _("Target Device"), + "type": "string", + "map_to": "targets", + }, + "targets": { + "name": _("Targets"), + "type": "list:string", + }, }, ) @@ -114,14 +166,39 @@ class NotifyHomeAssistant(NotifyBase): "type": "string", "regex": (r"^[a-z0-9_-]+$", "i"), }, + "batch": { + "name": _("Batch Mode"), + "type": "bool", + "default": False, + }, + "to": { + "alias_of": "targets", + }, + "token": { + # Shorthand alias for accesstoken in query string + "alias_of": "accesstoken", + }, + "prefix": { + "name": _("Path Prefix"), + "type": "string", + }, }, ) - def __init__(self, accesstoken, nid=None, **kwargs): + def __init__( + self, + accesstoken, + nid=None, + targets=None, + batch=None, + prefix=None, + **kwargs, + ): """Initialize Home Assistant Object.""" super().__init__(**kwargs) self.fullpath = kwargs.get("fullpath", "") + self.prefix = prefix if prefix else "" if not (self.secure or self.port): # Use default insecure port @@ -149,18 +226,50 @@ def __init__(self, accesstoken, nid=None, **kwargs): self.logger.warning(msg) raise TypeError(msg) + # Prepare Batch Mode Flag + self.batch = ( + self.template_args["batch"]["default"] if batch is None else batch + ) + + # Store our targets + self.targets = [] + + # Track our invalid targets + self._invalid_targets = [] + + if targets: + for target in parse_domain_service_targets(targets): + result = is_domain_service_target( + target, domain=self.default_domain + ) + if result: + self.targets.append( + ( + result["domain"], + result["service"], + result["targets"], + ) + ) + continue + + self.logger.warning( + "Dropped invalid [domain.]service[:target] entry " + "({}) specified.".format(target), + ) + self._invalid_targets.append(target) + else: + self.targets = [PERSISTENT_ENTRY] + return def send(self, body, title="", notify_type=NotifyType.INFO, **kwargs): """Sends Message.""" - # Prepare our persistent_notification.create payload + # Base payload; notification_id is only added for persistent + # notification calls — other service domains reject it. payload = { "title": title, "message": body, - # Use a unique ID so we don't over-write the last message - # we posted. Otherwise use the notification id specified - "notification_id": self.nid if self.nid else str(uuid4()), } # Prepare our headers @@ -181,20 +290,84 @@ def send(self, body, title="", notify_type=NotifyType.INFO, **kwargs): if isinstance(self.port, int): url += f":{self.port}" - url += ( + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = bool( + not self.targets or self.targets[0] is not PERSISTENT_ENTRY + ) + + if has_targets and not self.targets: + self.logger.warning( + "No valid Home Assistant service targets to notify." + ) + return False + + base_url = url + ( self.fullpath.rstrip("/") + "/api/services/persistent_notification/create" ) + # Send in batches if identified to do so + batch_size = 1 if not self.batch else self.default_batch_size + + for target in self.targets: + # Use a unique ID so we don't over-write the last message we + # posted. Otherwise use the notification id specified + if has_targets: + # Base target details + domain = target[0] + service = target[1] + + # Prepare our URL + base_url = ( + url + + self.prefix.rstrip("/") + + f"/api/services/{domain}/{service}" + ) + + # Possibly prepare batches + if target[2]: + _payload = payload.copy() + for index in range(0, len(target[2]), batch_size): + _payload["targets"] = target[2][ + index : index + batch_size + ] + if not self._ha_post( + base_url, _payload, headers, auth + ): + return False + + continue + + if not self._ha_post( + base_url, + payload, + headers, + auth, + persistent=not has_targets, + ): + return False + + return True + + def _ha_post(self, url, payload, headers, auth=None, persistent=False): + """ + Wrapper to single upstream server post + """ + # notification_id is only meaningful for persistent_notification; + # other HA service domains reject it with a 400. + if persistent: + payload["notification_id"] = self.nid if self.nid else str(uuid4()) + self.logger.debug( - "Home Assistant POST URL:" - f" {url} (cert_verify={self.verify_certificate!r})" + "Home Assistant POST URL: {} (cert_verify={!r})".format( + url, self.verify_certificate + ) ) - self.logger.debug(f"Home Assistant Payload: {payload!s}") + self.logger.debug("Home Assistant Payload: {}".format(str(payload))) # Always call throttle before any remote server i/o is made self.throttle() - try: r = requests.post( url, @@ -251,11 +424,7 @@ def url_identifier(self): self.user, self.password, self.host, - ( - self.port - if self.port - else (443 if self.secure else self.default_insecure_port) - ), + self.port, self.fullpath.rstrip("/"), self.accesstoken, ) @@ -264,7 +433,17 @@ def url(self, privacy=False, *args, **kwargs): """Returns the URL built dynamically based on specified arguments.""" # Define any URL parameters - params = {} + params = { + "batch": "yes" if self.batch else "no", + } + + if self.prefix not in ("", "/"): + params["prefix"] = ( + "/" + if not self.prefix + else "/{}/".format(self.prefix.strip("/")) + ) + if self.nid: params["nid"] = self.nid @@ -289,7 +468,13 @@ def url(self, privacy=False, *args, **kwargs): url = ( "{schema}://{auth}{hostname}{port}{fullpath}" - "{accesstoken}/?{params}" + "{accesstoken}/{targets}?{params}" + ) + + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = bool( + not self.targets or self.targets[0] is not PERSISTENT_ENTRY ) return url.format( @@ -312,9 +497,54 @@ def url(self, privacy=False, *args, **kwargs): ) ), accesstoken=self.pprint(self.accesstoken, privacy, safe=""), + targets="" + if not has_targets + else "/".join( + chain( + [ + NotifyHomeAssistant.quote( + "{}.{}{}".format( + x[0], + x[1], + "" if not x[2] else ":" + ",".join(x[2]), + ), + safe="", + ) + for x in self.targets + ], + [ + NotifyHomeAssistant.quote(x, safe="") + for x in self._invalid_targets + ], + ) + ), params=NotifyHomeAssistant.urlencode(params), ) + def __len__(self): + """ + Returns the number of targets associated with this notification + """ + # + # Factor batch into calculation + # + + # Determine if we're doing it the old way (using persistent notices) + # or the new (supporting device targets) + has_targets = bool( + not self.targets or self.targets[0] is not PERSISTENT_ENTRY + ) + + if not has_targets: + return 1 + + # Handle targets + batch_size = 1 if not self.batch else self.default_batch_size + return sum( + math.ceil(len(identities) / batch_size) if identities else 1 + for _, _, identities in self.targets + ) + @staticmethod def parse_url(url): """Parses the URL and returns enough arguments that can allow us to re- @@ -325,23 +555,82 @@ def parse_url(url): # We're done early as we couldn't load the results return results - # Get our Long-Lived Access Token - if "accesstoken" in results["qsd"] and len( - results["qsd"]["accesstoken"] - ): + # Initialize targets + results["targets"] = [] + + # Get our Long-Lived Access Token — check query string first + # (?accesstoken= or the shorthand ?token=) + if "accesstoken" in results["qsd"] and results["qsd"]["accesstoken"]: results["accesstoken"] = NotifyHomeAssistant.unquote( results["qsd"]["accesstoken"] ) + # Remaining path elements become targets + results["targets"] = NotifyHomeAssistant.split_path( + results["fullpath"] + ) + results["fullpath"] = "" + + elif "token" in results["qsd"] and results["qsd"]["token"]: + results["accesstoken"] = NotifyHomeAssistant.unquote( + results["qsd"]["token"] + ) + # Remaining path elements become targets + results["targets"] = NotifyHomeAssistant.split_path( + results["fullpath"] + ) + results["fullpath"] = "" else: - # Acquire our full path - fullpath = NotifyHomeAssistant.split_path(results["fullpath"]) + # Scan path forward; the first element matching the token + # regex is the access token. Everything before it is the + # prefix path; everything after it (reversed for consistent + # call ordering) is treated as service targets. + tokens = NotifyHomeAssistant.split_path(results["fullpath"]) + token_idx = None + for idx, t in enumerate(tokens): + if RE_IS_LONG_LIVED_TOKEN.match(t): + token_idx = idx + results["accesstoken"] = t + break + + if token_idx is not None: + # Elements after the token are service targets (reversed + # so the last URL segment is called first). + post_targets = list(reversed(tokens[token_idx + 1 :])) + # Elements before the token are also treated as service + # targets (reversed to maintain consistent ordering). + pre_targets = list(reversed(tokens[:token_idx])) + results["targets"] = post_targets + pre_targets + results["fullpath"] = "" + + elif tokens: + # No regex match — treat the last path element as the + # access token (plain token like "accesstoken" in tests). + results["accesstoken"] = tokens[-1] + results["fullpath"] = "" + + else: + results["accesstoken"] = None + + # Support ?to= for additional targets + if "to" in results["qsd"] and results["qsd"]["to"]: + results["targets"] += NotifyHomeAssistant.split_path( + NotifyHomeAssistant.unquote(results["qsd"]["to"]) + ) - # Otherwise pop the last element from our path to be it - results["accesstoken"] = fullpath.pop() if fullpath else None + # Support ?prefix= path-prefix override + if "prefix" in results["qsd"] and results["qsd"]["prefix"]: + results["prefix"] = NotifyHomeAssistant.unquote( + results["qsd"]["prefix"] + ) - # Re-assemble our full path - results["fullpath"] = "/" + "/".join(fullpath) if fullpath else "" + # Get Batch Mode Flag + results["batch"] = parse_bool( + results["qsd"].get( + "batch", + NotifyHomeAssistant.template_args["batch"]["default"], + ) + ) # Allow the specification of a unique notification_id so that # it will always replace the last one sent. diff --git a/apprise/utils/parse.py b/apprise/utils/parse.py index a592acd6c..19f20d0bf 100644 --- a/apprise/utils/parse.py +++ b/apprise/utils/parse.py @@ -100,6 +100,19 @@ r"\s*([+(\s]*[0-9][0-9()\s-]+[0-9])(?=$|[\s,+(]+[0-9])", re.I ) +IS_DOMAIN_SERVICE_TARGET = re.compile( + r"\s*((?P[a-z0-9_-]+)\.)?(?P[a-z0-9_-]+)" + r"(:(?P[a-z0-9_,-]+))?", + re.I, +) + +DOMAIN_SERVICE_TARGET_DETECTION_RE = re.compile( + r"\s*((?:[a-z0-9_-]+\.)?[a-z0-9_-]+" + r"(?::(?:[a-z0-9_-]+(?:,+[a-z0-9_-]+)+?))?)" + r"(?=$|(?:\s|,+\s|\s,+)+(?:[a-z0-9_-]+\.)?[a-z0-9_-]+)", + re.I, +) + # Support for prefix: (string followed by colon) infront of phone no PHONE_NO_WPREFIX_DETECTION_RE = re.compile( r"\s*((?:[a-z]+:)?[+(\s]*[0-9][0-9()\s-]+[0-9])" @@ -262,6 +275,44 @@ def is_uuid(uuid): return bool(match) +def is_domain_service_target(entry, domain="notify"): + """Determine if the specified entry a domain.service:target type + + Expects a string containing the following formats: + - service + - service:target + - service:target1,target2 + - domain.service:target + - domain.service:target1,target2 + + Args: + entry (str): The string you want to check. + + Returns: + bool: Returns False if the entry specified is domain.service:target + """ + + try: + result = IS_DOMAIN_SERVICE_TARGET.match(entry) + if not result: + # not parseable content as it does not even conform closely to a + # domain.service:target + return False + + except TypeError: + # not parseable content + return False + + return { + # Store domain or set default if not acquired + "domain": result.group("domain") if result.group("domain") else domain, + # store service + "service": result.group("service"), + # store targets if defined + "targets": parse_list(result.group("targets")), + } + + def is_phone_no(phone, min_len=10): """Determine if the specified entry is a phone number. @@ -358,7 +409,7 @@ def is_call_sign(callsign): callsign (str): The string you want to check. Returns: - bool: Returns False if the address specified is not a phone number + bool: Returns False if the enry specified is not a callsign """ try: @@ -849,6 +900,48 @@ def parse_bool(arg, default=False): return bool(arg) +def parse_domain_service_targets( + *args, store_unparseable=True, domain="notify", **kwargs +): + """ + Takes a string containing the following formats separated by space + - service + - service:target + - service:target1,target2 + - domain.service:target + - domain.service:target1,target2 + + If no domain is parsed, the default domain is returned. + + Targets can be comma separated (if multiple are to be defined) + """ + + result = [] + for arg in args: + if isinstance(arg, str) and arg: + _result = DOMAIN_SERVICE_TARGET_DETECTION_RE.findall(arg) + if _result: + result += _result + + elif not _result and store_unparseable: + # we had content passed into us that was lost because it was + # so poorly formatted that it didn't even come close to + # meeting the regular expression we defined. We intentially + # keep it as part of our result set so that parsing done + # at a higher level can at least report this to the end user + # and hopefully give them some indication as to what they + # may have done wrong. + result += list(filter(bool, re.split(STRING_DELIMITERS, arg))) + + elif isinstance(arg, (set, list, tuple)): + # Use recursion to handle the list of phone numbers + result += parse_domain_service_targets( + *arg, store_unparseable=store_unparseable, domain=domain + ) + + return result + + def parse_phone_no(*args, store_unparseable=True, prefix=False, **kwargs): """Takes a string containing phone numbers separated by comma's and/or spaces and returns a list.""" diff --git a/tests/test_apprise_utils.py b/tests/test_apprise_utils.py index 82ef1d858..dbd5d8f3c 100644 --- a/tests/test_apprise_utils.py +++ b/tests/test_apprise_utils.py @@ -1666,6 +1666,67 @@ def test_is_email(): assert results["user"] == "a-z0-9_!#$%&*/=?%`{|}~^.-" +def test_is_domain_service_target(): + """ + API: is_domain_service_target() function + + """ + # Invalid information + assert utils.parse.is_domain_service_target(None) is False + assert utils.parse.is_domain_service_target(42) is False + assert utils.parse.is_domain_service_target(object) is False + assert utils.parse.is_domain_service_target("") is False + assert utils.parse.is_domain_service_target("+()") is False + assert utils.parse.is_domain_service_target("+") is False + + # Valid entries + result = utils.parse.is_domain_service_target("service") + assert isinstance(result, dict) + assert result["service"] == "service" + # Default domain + assert result["domain"] == "notify" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 0 + + result = utils.parse.is_domain_service_target("domain.service") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 0 + + result = utils.parse.is_domain_service_target("domain.service:target") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 1 + assert result["targets"][0] == "target" + + result = utils.parse.is_domain_service_target("domain.service:t1,t2,t3") + assert isinstance(result, dict) + assert result["service"] == "service" + assert result["domain"] == "domain" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 3 + assert "t1" in result["targets"] + assert "t2" in result["targets"] + assert "t3" in result["targets"] + + result = utils.parse.is_domain_service_target( + "service:t1,t2,t3", domain="new_default" + ) + assert isinstance(result, dict) + assert result["service"] == "service" + # Default domain + assert result["domain"] == "new_default" + assert isinstance(result["targets"], list) + assert len(result["targets"]) == 3 + assert "t1" in result["targets"] + assert "t2" in result["targets"] + assert "t3" in result["targets"] + + def test_is_call_sign_no(): """ API: is_call_sign() function @@ -1894,6 +1955,61 @@ def test_parse_call_sign(): assert "A0AF-12" in results +def test_parse_domain_service_targets(): + """utils: parse_domain_service_targets() testing""" + + results = utils.parse.parse_domain_service_targets("") + assert isinstance(results, list) + assert len(results) == 0 + + results = utils.parse.parse_domain_service_targets("service1 service2") + assert isinstance(results, list) + assert len(results) == 2 + assert "service1" in results + assert "service2" in results + + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2" + ) + assert isinstance(results, list) + assert len(results) == 1 + assert "service1:target1,target2" in results + + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2 service2 domain.service3" + ) + assert isinstance(results, list) + assert len(results) == 3 + assert "service1:target1,target2" in results + assert "service2" in results + assert "domain.service3" in results + + # Support a comma in the space between entries + results = utils.parse.parse_domain_service_targets( + "service1:target1,target2, service2 ,domain.service3, , , service4" + ) + assert isinstance(results, list) + assert len(results) == 4 + assert "service1:target1,target2" in results + assert "service2" in results + assert "domain.service3" in results + assert "service4" in results + + results = utils.parse.parse_domain_service_targets( + "service:target1,target2" + ) + assert isinstance(results, list) + assert len(results) == 1 + assert "service:target1,target2" in results + + # Handle unparseables + results = utils.parse.parse_domain_service_targets( + ": %invalid ^entries%", store_unparseable=False + ) + assert isinstance(results, list) + assert len(results) == 0 + + def test_parse_phone_no(): """utils: parse_phone_no() testing""" # A simple single array entry (As str) diff --git a/tests/test_plugin_homeassistant.py b/tests/test_plugin_homeassistant.py index 9b3ce71b0..09ebfc806 100644 --- a/tests/test_plugin_homeassistant.py +++ b/tests/test_plugin_homeassistant.py @@ -25,6 +25,8 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import json + # Disable logging for a cleaner testing output import logging from unittest import mock @@ -65,13 +67,60 @@ }, ), ( - "hassio://localhost/long-lived-access-token", + "hassio://localhost/long.lived.token", { "instance": NotifyHomeAssistant, }, ), ( - "hassio://user:pass@localhost/long-lived-access-token/", + "hassio://localhost/prefix/path/long.lived.token", + { + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://localhost/long.lived.token?prefix=/ha", + { + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://localhost/service/?token=long.lived.token&prefix=/ha", + { + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://localhost/?token=long.lived.token&prefix=/ha&to=service", + { + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://localhost/service/$%/?token=long.lived.token&prefix=/ha", + { + # Tests an invalid service entry + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://localhost/%only%/%invalid%/?token=lng.lived.token&prefix=/ha", + { + # Tests an invalid service entry + "instance": NotifyHomeAssistant, + # we'll have a notify response failure in this case + "notify_response": False, + }, + ), + ( + "hassio://localhost/service/?accesstoken=long.lived.token", + { + # accesstoken via query string (?accesstoken=) + "instance": NotifyHomeAssistant, + }, + ), + ( + "hassio://user:pass@localhost/long.lived.token/", { "instance": NotifyHomeAssistant, # Our expected url(privacy=True) startswith() response: @@ -79,64 +128,64 @@ }, ), ( - "hassio://localhost:80/long-lived-access-token", + "hassio://localhost:80/long.lived.token", { "instance": NotifyHomeAssistant, }, ), ( - "hassio://user@localhost:8123/llat", + "hassio://user@localhost:8123/long.lived.token", { "instance": NotifyHomeAssistant, - "privacy_url": "hassio://user@localhost/l...t", + "privacy_url": "hassio://user@localhost/l...n", }, ), ( - "hassios://localhost/llat?nid=!%", + "hassios://localhost/long.lived.token?nid=!%", { # Invalid notification_id "instance": TypeError, }, ), ( - "hassios://localhost/llat?nid=abcd", + "hassios://localhost/long.lived.token?nid=abcd", { # Valid notification_id "instance": NotifyHomeAssistant, }, ), ( - "hassios://user:pass@localhost/llat", + "hassios://user:pass@localhost/long.lived.token", { "instance": NotifyHomeAssistant, - "privacy_url": "hassios://user:****@localhost/l...t", + "privacy_url": "hassios://user:****@localhost/l...n", }, ), ( - "hassios://localhost:8443/path/llat/", + "hassios://localhost:8443/path/long.lived.token/", { "instance": NotifyHomeAssistant, - "privacy_url": "hassios://localhost:8443/path/l...t", + "privacy_url": "hassios://localhost:8443/l...n", }, ), ( - "hassio://localhost:8123/a/path?accesstoken=llat", + "hassio://localhost:8123/a/path?token=long.lived.token", { "instance": NotifyHomeAssistant, # Default port; so it's stripped off - # accesstoken was specified as kwarg - "privacy_url": "hassio://localhost/a/path/l...t", + # token was specified as kwarg + "privacy_url": "hassio://localhost/l...n", }, ), ( - "hassios://user:password@localhost:80/llat/", + "hassios://user:password@localhost:80/long.lived.token/", { "instance": NotifyHomeAssistant, "privacy_url": "hassios://user:****@localhost:80", }, ), ( - "hassio://user:pass@localhost:8123/llat", + "hassio://user:pass@localhost:8123/long.lived.token", { "instance": NotifyHomeAssistant, # force a failure @@ -145,20 +194,20 @@ }, ), ( - "hassio://user:pass@localhost/llat", + "hassio://user:pass@localhost/long.lived.token", { "instance": NotifyHomeAssistant, - # throw a bizarre code forcing us to fail to look it up + # throw a bizzare code forcing us to fail to look it up "response": False, "requests_response_code": 999, }, ), ( - "hassio://user:pass@localhost/llat", + "hassio://user:pass@localhost/long.lived.token", { "instance": NotifyHomeAssistant, - # Throws a series of i/o exceptions with this flag - # is set and tests that we gracefully handle them + # Throws a series of connection and transfer exceptions + # when this flag is set and tests that we gracfully handle them "test_requests_exceptions": True, }, ), @@ -196,3 +245,190 @@ def test_plugin_homeassistant_general(mock_post): mock_post.call_args_list[0][0][0] == "http://localhost:8123/api/services/persistent_notification/create" ) + + # Reset our mock object + mock_post.reset_mock() + + # Now let's notify an object + obj = Apprise.instantiate("hassio://localhost/long.lived.token/service") + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test") is True + + assert ( + mock_post.call_args_list[0][0][0] + == "http://localhost:8123/api/services/notify/service" + ) + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test" + assert "title" in posted_json + assert posted_json["title"] == "" + + # Reset our mock object + mock_post.reset_mock() + + # + # No Batch Processing + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2/domain1.service3?batch=no" + ) + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test-body", title="title") is True + + # Entries are split apart + assert len(obj) == 4 + assert mock_post.call_count == 4 + + assert ( + mock_post.call_args_list[0][0][0] + == "http://localhost:8123/api/services/domain1/service3" + ) + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert ( + mock_post.call_args_list[1][0][0] + == "http://localhost:8123/api/services/notify/service2" + ) + posted_json = json.loads(mock_post.call_args_list[1][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert ( + mock_post.call_args_list[2][0][0] + == "http://localhost:8123/api/services/notify/serviceA" + ) + posted_json = json.loads(mock_post.call_args_list[2][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + assert len(posted_json["targets"]) == 1 + assert "target1" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert ( + mock_post.call_args_list[3][0][0] + == "http://localhost:8123/api/services/notify/serviceA" + ) + posted_json = json.loads(mock_post.call_args_list[3][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + assert len(posted_json["targets"]) == 1 + assert "target2" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + # Reset our mock object + mock_post.reset_mock() + + # + # Batch Processing + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2/domain1.service3?batch=yes" + ) + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + # Send Notification + assert obj.send(body="test-body", title="title") is True + + # Entries targets can be grouped + assert len(obj) == 3 + assert mock_post.call_count == 3 + + assert ( + mock_post.call_args_list[0][0][0] + == "http://localhost:8123/api/services/domain1/service3" + ) + posted_json = json.loads(mock_post.call_args_list[0][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert ( + mock_post.call_args_list[1][0][0] + == "http://localhost:8123/api/services/notify/service2" + ) + posted_json = json.loads(mock_post.call_args_list[1][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" not in posted_json + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + assert ( + mock_post.call_args_list[2][0][0] + == "http://localhost:8123/api/services/notify/serviceA" + ) + posted_json = json.loads(mock_post.call_args_list[2][1]["data"]) + assert "notification_id" not in posted_json + assert "targets" in posted_json + assert isinstance(posted_json["targets"], list) + # Our batch groups our targets + assert len(posted_json["targets"]) == 2 + assert "target1" in posted_json["targets"] + assert "target2" in posted_json["targets"] + assert "message" in posted_json + assert posted_json["message"] == "test-body" + assert "title" in posted_json + assert posted_json["title"] == "title" + + # Reset our mock object + mock_post.reset_mock() + + # + # Test error handling on multi-query request + # + + # Now let's notify an object + obj = Apprise.instantiate( + "hassio://localhost/long.lived.token/serviceA:target1,target2/" + "service2:target3,target4,target5,target6?batch=no" + ) + + assert isinstance(obj, NotifyHomeAssistant) is True + assert isinstance(obj.url(), str) is True + + bad_response = mock.Mock() + bad_response.content = "" + bad_response.status_code = requests.codes.not_found + + mock_post.side_effect = (response, bad_response) + + # We will fail on our second message sent + assert obj.send(body="test-body", title="title") is False