Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sssd_test_framework/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def test_ldap(client: Client, ldap: LDAP):
name="gdm",
topology=Topology(TopologyDomain("sssd", client=1, ipa=1, keycloak=1)),
controller=GDMTopologyController(),
domains=dict(test="sssd.ipa[0]"),
fixtures=dict(client="sssd.client[0]", ipa="sssd.ipa[0]", provider="sssd.ipa[0]", keycloak="sssd.keycloak[0]"),
)
"""
Expand Down
101 changes: 68 additions & 33 deletions sssd_test_framework/topology_controllers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import re
import socket

from pytest_mh import BackupTopologyController
Expand Down Expand Up @@ -58,6 +59,54 @@ def teardown(self) -> None:

super().teardown()

def join_domain(self, client: ClientHost, provider: IPAHost | ADHost | SambaHost):
"""
Helper method for joining domains
"""
self.logger.info(f"Enrolling {client.hostname} into {provider.domain}")

# Remove any existing Kerberos configuration and keytab
client.fs.rm("/etc/krb5.conf")
client.fs.rm("/etc/krb5.keytab")

# First check if joined. If so, leave.
result = client.conn.exec(["realm", "list"])
self.logger.info(f"REALM_LIST STDOUT: \n{result.stdout}")
self.logger.info(f"REALM_LIST STDERR: \n{result.stderr}")
pattern = rf"^{re.escape(provider.domain)}$"
if re.search(pattern, result.stdout, re.MULTILINE):
self.logger.info(f"Found {provider.domain} in realm list. Leaving.")
result = client.conn.exec(
["realm", "leave", provider.domain], input=provider.adminpw, raise_on_error=False
)
self.logger.info(f"Optional realm leave failed...continuing to join {provider.domain}.")

if isinstance(provider, (IPAHost)):
# Backup ipa-client-install files
client.fs.backup("/etc/ipa")
client.fs.backup("/var/lib/ipa-client")

# Configure realm to keep kerberos intact
client.fs.backup("/etc/realmd.conf")
client.fs.write(
"/etc/realmd.conf",
"""
[service]
manage-krb5-conf = no
""",
)

# Join provider domain
result = client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw, raise_on_error=False)
if result.rc != 0:
self.logger.info(f"Running realm join failed with:\n{result.stdout}\n{result.stderr}")
self.logger.info("Trying uninstall and join again.")
if isinstance(provider, (IPAHost)):
client.conn.exec(["ipa-client-install", "--uninstall", "-U"])
else:
client.conn.exec(["realm", "leave", "--unattended", provider.domain], input=provider.adminpw)
client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw)


class ClientTopologyController(ProvisionedBackupTopologyController):
"""
Expand Down Expand Up @@ -86,18 +135,8 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost) -> None:
self.logger.info(f"Topology '{self.name}' is already provisioned")
return

self.logger.info(f"Enrolling {client.hostname} into {ipa.domain}")

# Remove any existing Kerberos configuration and keytab
client.fs.rm("/etc/krb5.conf")
client.fs.rm("/etc/krb5.keytab")

# Backup ipa-client-install files
client.fs.backup("/etc/ipa")
client.fs.backup("/var/lib/ipa-client")

# Join ipa domain
client.conn.exec(["realm", "join", ipa.domain], input=ipa.adminpw)
# Join IPA domain
self.join_domain(client, ipa)

# Backup so we can restore to this state after each test
super().topology_setup()
Expand All @@ -114,14 +153,8 @@ def topology_setup(self, client: ClientHost, provider: ADHost | SambaHost) -> No
self.logger.info(f"Topology '{self.name}' is already provisioned")
return

self.logger.info(f"Enrolling {client.hostname} into {provider.domain}")

# Remove any existing Kerberos configuration and keytab
client.fs.rm("/etc/krb5.conf")
client.fs.rm("/etc/krb5.keytab")

# Join AD domain
client.conn.exec(["realm", "join", provider.domain], input=provider.adminpw)
# Join AD/Samba domain
self.join_domain(client, provider)

# Backup so we can restore to this state after each test
super().topology_setup()
Expand Down Expand Up @@ -156,18 +189,8 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost, trusted: ADHost | Sam

# Do not enroll client into IPA domain if it is already joined
if "ipa" not in self.multihost.provisioned_topologies:
self.logger.info(f"Enrolling {client.hostname} into {ipa.domain}")

# Remove any existing Kerberos configuration and keytab
client.fs.rm("/etc/krb5.conf")
client.fs.rm("/etc/krb5.keytab")

# Backup ipa-client-install files
client.fs.backup("/etc/ipa")
client.fs.backup("/var/lib/ipa-client")

# Join IPA domain)
client.conn.exec(["realm", "join", ipa.domain], input=ipa.adminpw)
# Join IPA domain
self.join_domain(client, ipa)

# Backup so we can restore to this state after each test
super().topology_setup()
Expand Down Expand Up @@ -284,12 +307,19 @@ class GDMTopologyController(ProvisionedBackupTopologyController):

@BackupTopologyController.restore_vanilla_on_error
def topology_setup(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHost) -> None:
if "gdm" not in client.features or not client.features["gdm"]:
self.logger.info(f"Topology '{self.name}' setup skipped because gdm feature not found on client")
return

if self.provisioned:
self.logger.info(f"Topology '{self.name}' is already provisioned")
return

self.logger.info(f"Enrolling IPA server {ipa.hostname} into {keycloak.hostname} by creating an IdP client")

# Join IPA domain
self.join_domain(client, ipa)

# Create an IdP client
keycloak.kclogin()
keycloak.conn.run(
Expand All @@ -311,13 +341,18 @@ def topology_setup(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHos
# Backup so we can restore to this state after each test
super().topology_setup()

def topology_teardown(self, ipa: IPAHost, keycloak: KeycloakHost) -> None:
def topology_teardown(self, client: ClientHost, ipa: IPAHost, keycloak: KeycloakHost) -> None:
if "gdm" not in client.features or not client.features["gdm"]:
self.logger.info(f"Topology '{self.name}' teardown skipped because gdm feature not found on client")
return

self.logger.info(f"Un-enrolling IPA server {ipa.hostname} from {keycloak.hostname} by deleting the IdP client")
keycloak.kclogin()
keycloak.conn.run(
"ID=$(/opt/keycloak/bin/kcadm.sh get clients -q clientId=ipa_oidc_client --fields=id|jq -r '.[0].id'); "
"/opt/keycloak/bin/kcadm.sh delete clients/$ID"
)

ipa.kinit()
ipa.conn.run("ipa idp-del keycloak")
super().topology_teardown()
14 changes: 9 additions & 5 deletions sssd_test_framework/utils/gdm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __init__(self, host):
super().__init__(host)
self.init_completed = False

self.cmd = [scauto_path, "gui", "--wait-time", "1", "--no-screenshot"]
self.cmd = [scauto_path, "--verbose", "debug", "gui", "--wait-time", "1", "--no-screenshot"]

def teardown(self):
if self.init_completed:
Expand All @@ -56,10 +56,13 @@ def assert_text(self, word: str) -> bool:
if not self.init_completed:
self.init()

result = self.host.conn.exec([*self.cmd, "assert-text", word])
result = self.host.conn.exec([*self.cmd, "assert-text", word], raise_on_error=False)
if result.rc != 0:
self.logger.warn(f"Unable to find text ({word}) on screen")
return result.rc == 0

def click_on(self, word: str) -> bool:
@retry(max_retries=3, delay=1, on=AssertionError)
def click_on(self, word: str) -> None:
"""
Run scauto gui click-on

Expand All @@ -71,8 +74,9 @@ def click_on(self, word: str) -> bool:
if not self.init_completed:
self.init()

result = self.host.conn.exec([*self.cmd, "click-on", word])
return result.rc == 0
result = self.host.conn.exec([*self.cmd, "click-on", word], raise_on_error=False)
if result.rc != 0:
raise AssertionError(f"Unable to click-on target {word}")

def kb_write(self, word: str) -> bool:
"""
Expand Down