diff --git a/pipeline/actions/scaling.yaml b/pipeline/actions/scaling.yaml index 034ccdc..21c7b13 100644 --- a/pipeline/actions/scaling.yaml +++ b/pipeline/actions/scaling.yaml @@ -1,5 +1,5 @@ name: scaling -pack: sandbox +pack: pipeline runner_type: python-script description: Trigger scaling action via webhook on autoscaling group enabled: true diff --git a/swarm/README.md b/swarm/README.md new file mode 100755 index 0000000..984aed4 --- /dev/null +++ b/swarm/README.md @@ -0,0 +1,24 @@ +# Docker Swarm integration pack (WIP) + +Currently, the pack implements a "job" on top of docker "Swarm" service, +and contains sensor and rules for auto-scaling Swarm cluster +on top of AWS and OpenStack. + + TODO: add `docker service`, `docker swarm`, and `docker node` actions. + + + TODO: consider moving scale-swarm automation to a separate automation pack. + +**Note:** Don't forget to give `st2` user access to the Docker group `usermod -a -G docker st2`, +else docker won't work. + +To run the pack's unit tests: + +``` +# Activate existing pack's virtual environmet +source /opt/stackstorm/virtualenvs/swarm/bin/activate +# Run the tests first time - it will install test dependencies +st2-run-pack-tests -x /opt/stackstorm/swarm +# Skip installing dependencies on subsequent runs +st2-run-pack-tests -x -j /opt/stackstorm/swarm +``` diff --git a/swarm/actions/job.py b/swarm/actions/job.py new file mode 100644 index 0000000..6fcf0f9 --- /dev/null +++ b/swarm/actions/job.py @@ -0,0 +1,110 @@ +from st2actions.runners.pythonrunner import Action +import random +import time + +import docker + + +class RunJobAction(Action): + + def __init__(self, config): + super(RunJobAction, self).__init__(config) + # TODO(dzimine): get URL from config + self.client = docker.DockerClient(base_url='unix://var/run/docker.sock') + self.pool_interval = 1 + + def run(self, image, command=None, args=None, mounts=None, name=None, + reserve_cpu=None, reserve_memory=None): + + name = "{}-{}".format(name, hex(random.getrandbits(64)).lstrip('0x')) + self.logger.info("Creating job %s", name) + + # Create service + try: + args = [str(x) for x in args] if args else None + mounts = [_parse_mount_string(mount) for mount in mounts] if mounts else None + + # TODO(dzimine): Add label + cs = docker.types.ContainerSpec( + image, command=command, args=args, mounts=mounts) + r = {'Reservations': {'MemoryBytes': reserve_memory, 'NanoCPUs': reserve_cpu}} + tt = docker.types.TaskTemplate( + cs, restart_policy={'Condition': 'none'}, resources=r) + self.logger.debug("TaskTemplate: %s", tt) + + job = self.client.api.create_service(tt, name=name) + + self.logger.info("Job %s created: %s", name, job) + + # NOTE: with `restart-condition=none`, there is only one task per replica. + # In general case polling doesn't work well as swarm restart tasks, + # keeping 5 latest tasks. + + # Poll for tasks + while True: + time.sleep(self.pool_interval) + tasks = self.client.api.tasks(filters={'service': job['ID']}) + + task = tasks[0] + status = task['Status'] + state = status['State'] + + self.logger.debug("Job %s task %s: %s", job['ID'], task['ID'], state) + + if state in ('failed', 'rejected'): + # TODO: get logs, waiting for docker-py/pull/1446 + self.logger.error("Job %s: %s", state, status['Err']) + return (False, status['Err']) + + if state == 'complete': + break + + # XXX: remove the job? Keeping for now for debugging. + # self.logger.info("Removing job %s", name) + + except docker.errors.APIError as e: + self.logger.error(e) + return (False, str(e)) + + res = { + 'job': name, + 'image': image, + 'command': command, + 'args': args + } + + return (True, res) + + +def _parse_mount_string(string): + mount_kwargs = {} + try: + fields = string.split(',') + for field in fields: + pair = field.split('=', 1) + key = pair[0] + + if len(pair) == 1: + if key in ['readonly', 'ro']: + mount_kwargs['read_only'] = True + elif key == 'volume-nocopy': + mount_kwargs['no_copy'] = True + continue + + val = pair[1] + if key in ['target', 'dst', 'destination']: + target = val + elif key in ['source', 'src']: + source = val + elif key in ['readonly, ro']: + mount_kwargs['read_only'] = val + elif key in ['type', 'propagation']: + mount_kwargs[key] = val + elif key == 'volume-label': + k, v = val.strip("\"\'").split('=') + mount_kwargs.setdefault('labels', {}).update({k: v}) + + except Exception as e: + raise SyntaxError("Invalid mount format {0}\n{1}".format(string, e)) + + return docker.types.Mount(target, source, **mount_kwargs) diff --git a/swarm/actions/job.yaml b/swarm/actions/job.yaml new file mode 100644 index 0000000..692a311 --- /dev/null +++ b/swarm/actions/job.yaml @@ -0,0 +1,38 @@ +--- +name: "job" +pack: "swarm" +runner_type: "run-python" +description: "Run a dockerized job on swarm cluster." +enabled: true +entry_point: "job.py" +parameters: + image: + type: "string" + description: "Docker image" + required: true + command: + type: "string" + description: "Command" + required: false + args: + type: "array" + description: "Command arguments (array of strings)" + required: false + mounts: + type: "array" + description: "List of mounts, as strings following docker service --mount syntax" + required: false + name: + type: "string" + description: "Job name prefix (default is `job`)" + required: false + default: "job" + reserve_cpu: + type: integer + description: CPU limit in units of 10^9 CPU shares. + reserve_memory: + type: integer + description: Memory limit in Bytes. + + # TODO: expand to all docker service commands + diff --git a/swarm/actions/pending_queue.py b/swarm/actions/pending_queue.py new file mode 100644 index 0000000..e34010f --- /dev/null +++ b/swarm/actions/pending_queue.py @@ -0,0 +1,19 @@ +import docker +from st2actions.runners.pythonrunner import Action + + +class PendingQueueAction(Action): + + def __init__(self, config): + super(PendingQueueAction, self).__init__(config) + self.client = docker.DockerClient( + self.config.get('base_url', 'unix://var/run/docker.sock')) + + def run(self): + try: + + tasks = self.client.api.tasks(filters={'desired-state': 'running'}) + pending_tasks = [task for task in tasks if task['Status']['State'] == 'pending'] + return (True, {'result': {'pending_count': len(pending_tasks)}}) + except Exception as e: + return (False, str(e)) diff --git a/swarm/actions/pending_queue.yaml b/swarm/actions/pending_queue.yaml new file mode 100644 index 0000000..6f80c0a --- /dev/null +++ b/swarm/actions/pending_queue.yaml @@ -0,0 +1,7 @@ +--- +name: "pending_queue" +runner_type: "run-python" +description: "Returns count of pending Swarm tasks." +enabled: true +entry_point: "pending_queue.py" +parameters: {} \ No newline at end of file diff --git a/swarm/actions/scale_openstack.py b/swarm/actions/scale_openstack.py new file mode 100644 index 0000000..7ba4705 --- /dev/null +++ b/swarm/actions/scale_openstack.py @@ -0,0 +1,46 @@ +import json +import requests + +from st2actions.runners.pythonrunner import Action + + +class ScalingAction(Action): + + def run(self, auth_endpoint, username, api_key, webhook): + + # Authenticate with server. + + headers = {'Content-Type': 'application/json'} + + auth_data = { + 'auth': { + 'RAX-KSKEY:apiKeyCredentials': { + 'username': username, + 'apiKey': api_key + } + } + } + + response = requests.post(auth_endpoint, headers=headers, data=json.dumps(auth_data)) + + if response.status_code != 200: + self.logger.error('Unable to authenticate.') + self.logger.error('Status code "%s" returned.' % response.status_code) + raise Exception(response.text) + + auth_result = response.json() + + if (not auth_result.get('access', None) or + not auth_result.get('access').get('token', None) or + not auth_result.get('access').get('token').get('id', None)): + raise Exception('Unable to find auth token.') + + # Make webhook call to execute the scaling action. + + auth_token = auth_result['access']['token']['id'] + + headers = {'X-Auth-Token': auth_token} + + response = requests.post(webhook, headers=headers) + + return response.text diff --git a/swarm/actions/scale_openstack.yaml b/swarm/actions/scale_openstack.yaml new file mode 100644 index 0000000..34f9019 --- /dev/null +++ b/swarm/actions/scale_openstack.yaml @@ -0,0 +1,22 @@ +name: scale_openstack +pack: swarm +runner_type: python-script +description: Trigger scaling action via webhook on autoscaling group +enabled: true +entry_point: scaling.py +parameters: + auth_endpoint: + type: string + required: true + default: "{{st2kv.system.rax_auth_endpoint}}" + username: + type: string + required: true + default: "{{st2kv.system.rax_auth_username}}" + api_key: + type: string + required: true + default: "{{st2kv.system.rax_auth_apikey}}" + webhook: + type: string + required: true diff --git a/swarm/config.schema.yaml b/swarm/config.schema.yaml new file mode 100755 index 0000000..667131c --- /dev/null +++ b/swarm/config.schema.yaml @@ -0,0 +1,18 @@ +--- + base_url: + description: "Docker API URL" + type: string + required: true + default: "unix://var/run/docker.sock" + swarm_pending_tasks_threshold: + description: Trigger only when pending tasks count above the threshold. + type: integer + required: false + default: 0 + swarm_pending_tasks_polling_interval: + description: Interval for polling swarm api for tasks, seconds. + type: integer + required: true + default: 5 + + diff --git a/swarm/icon.png b/swarm/icon.png new file mode 100644 index 0000000..6130a06 Binary files /dev/null and b/swarm/icon.png differ diff --git a/swarm/pack.yaml b/swarm/pack.yaml new file mode 100755 index 0000000..8f93b4f --- /dev/null +++ b/swarm/pack.yaml @@ -0,0 +1,10 @@ +--- +name: swarm +ref: swarm +description: Docker Swarm integration pack +keywords: + - docker + - automation +version : 0.1.0 +author : Dmitri Zimin(e) +email : dz@stackstorm.com diff --git a/swarm/requirements.txt b/swarm/requirements.txt new file mode 100755 index 0000000..bdb9670 --- /dev/null +++ b/swarm/requirements.txt @@ -0,0 +1 @@ +docker diff --git a/swarm/rules/on_pending_scaleup_aws.yaml b/swarm/rules/on_pending_scaleup_aws.yaml new file mode 100644 index 0000000..1058e9f --- /dev/null +++ b/swarm/rules/on_pending_scaleup_aws.yaml @@ -0,0 +1,21 @@ +--- +name: on_pending_scaleup_aws +pack: swarm +description: "Scale up the Swarm on AWS when pending tasks go over threshold." +enabled: True + +trigger: + type: swarm.pending_tasks + parameters: {} + +criteria: + # Crossing threshold up + trigger.over_threshold: + type: equals + pattern: True + +action: + ref: aws.autoscaling_update_auto_scaling_group + parameters: + AutoScalingGroupName: swarm-workers-tf + DesiredCapacity: "{{ st2kv.system.asg.workers }}" diff --git a/swarm/rules/on_pending_scaleup_openstack.yaml b/swarm/rules/on_pending_scaleup_openstack.yaml new file mode 100644 index 0000000..f5b8520 --- /dev/null +++ b/swarm/rules/on_pending_scaleup_openstack.yaml @@ -0,0 +1,20 @@ +--- +name: on_pending_scaleup_openstack +pack: swarm +description: "Scale up the Swarm when pending tasks go over threshold." +enabled: False + +trigger: + type: swarm.pending_tasks + parameters: {} + +criteria: + # Crossing threshold up + trigger.over_threshold: + type: equals + pattern: True + +action: + ref: swarm.scale_openstack + parameters: + webhook: "https://webhook_url_specific_to_the_asg_for_scale_up" diff --git a/swarm/rules/scale_in_test.yaml b/swarm/rules/scale_in_test.yaml new file mode 100644 index 0000000..6018d66 --- /dev/null +++ b/swarm/rules/scale_in_test.yaml @@ -0,0 +1,21 @@ +--- +name: scale_in_test +pack: swarm +description: "Test scale-in rule, fires on swarm pending task queue count going DOWN BELOW threshold" +enabled: True + +trigger: + type: swarm.pending_tasks + parameters: {} + +criteria: + # Crossing threshold down + trigger.over_threshold: + type: equals + pattern: False + +action: + ref: core.local + parameters: + cmd: "echo Pending queue going down: {{ trigger.count }}" + diff --git a/swarm/rules/scale_out_test.yaml b/swarm/rules/scale_out_test.yaml new file mode 100644 index 0000000..d0c7243 --- /dev/null +++ b/swarm/rules/scale_out_test.yaml @@ -0,0 +1,21 @@ +--- +name: scale_out_test +pack: swarm +description: "Test scale-in rule, fires on swarm pending task queue count going UP ABOVE threshold" +enabled: True + +trigger: + type: swarm.pending_tasks + parameters: {} + +criteria: + # Crossing threshold up + trigger.over_threshold: + type: equals + pattern: True + +action: + ref: core.local + parameters: + cmd: "echo Pending queue going up: {{ trigger.count }}" + diff --git a/swarm/sensors/pending_queue.py b/swarm/sensors/pending_queue.py new file mode 100644 index 0000000..211bdd9 --- /dev/null +++ b/swarm/sensors/pending_queue.py @@ -0,0 +1,70 @@ +from st2reactor.sensor.base import PollingSensor + +import docker + +TRIGGER = "swarm.pending_tasks" + + +class SwarmPendingTasksSensor(PollingSensor): + """ + Swarm pending tasks queue sensor. + + Sensor polls docker swarm for the `pending` tasks + and dispatches the count and the list + """ + + def __init__(self, sensor_service, config=None): + super(SwarmPendingTasksSensor, self).__init__(sensor_service=sensor_service, + config=config) + self.poll_interval = config.get('poll_interval', 5) if config else 5 + self._logger = self.sensor_service.get_logger(name=self.__class__.__name__) + self.over_threshold = False + + def setup(self): + self.client = docker.DockerClient(base_url=self.config.get('base_url')) + self.threshold = self._config.get('swarm_pending_tasks_threshold', 0) + + def poll(self): + tasks = self.client.api.tasks(filters={'desired-state': 'running'}) + pending_tasks = [task for task in tasks if task['Status']['State'] == 'pending'] + pending_count = len(pending_tasks) + + def _payload(): + return { + "count": pending_count, + "over_threshold": self.over_threshold, + "tasks": pending_tasks + } + + if not self.over_threshold and pending_count > self.threshold: + self.over_threshold = True + self._logger.info( + 'Dispatching trigger %s, UP ABOVE threshold, payload=%s', TRIGGER, _payload()) + # TODO: add trace_tag + self._sensor_service.dispatch(trigger=TRIGGER, payload=_payload()) + elif self.over_threshold and pending_count <= self.threshold: + self.over_threshold = False + self._logger.info( + 'Dispatching trigger %s, DOWN BELOW threshold, payload=%s', TRIGGER, _payload()) + self._sensor_service.dispatch(trigger=TRIGGER, payload=_payload()) + else: + self._logger.debug( + 'Not dispatching trigger %s: over=%s threshold=%d count=%d ', + TRIGGER, self.over_threshold, self.threshold, pending_count) + + def cleanup(self): + # This is called when the st2 system goes down. You can perform cleanup operations like + # closing the connections to external system here. + pass + + def add_trigger(self, trigger): + # This method is called when trigger is created + pass + + def update_trigger(self, trigger): + # This method is called when trigger is updated + pass + + def remove_trigger(self, trigger): + # This method is called when trigger is deleted + pass diff --git a/swarm/sensors/pending_queue.yaml b/swarm/sensors/pending_queue.yaml new file mode 100644 index 0000000..4880fde --- /dev/null +++ b/swarm/sensors/pending_queue.yaml @@ -0,0 +1,20 @@ +class_name: SwarmPendingTasksSensor +entry_point: pending_queue.py +description: "Swarm pending queue sensor" +trigger_types: + - + name: pending_tasks + description: "Swarm pending tasks count cross over/under threshold" + payload_schema: + type: object + properties: + count: + type: integer + description: Number of pending swarm tasks. + over_threshold: + type: boolean + description: True if going up above threshold, False if going down below. + tasks: + type: array + default: [] + description: List of swarm pending tasks. diff --git a/swarm/tests/fixtures/config.yaml b/swarm/tests/fixtures/config.yaml new file mode 100644 index 0000000..0915571 --- /dev/null +++ b/swarm/tests/fixtures/config.yaml @@ -0,0 +1,4 @@ +--- +base_url: "unix://var/run/docker.sock" +swarm_pending_tasks_threshold: 1 +swarm_pending_tasks_polling_interval: 5 diff --git a/swarm/tests/test_job.py b/swarm/tests/test_job.py new file mode 100644 index 0000000..03e310a --- /dev/null +++ b/swarm/tests/test_job.py @@ -0,0 +1,111 @@ +from unittest2 import TestCase +from unittest2 import skip +from st2tests.base import BaseActionTestCase + +import mock + +import job + + +class ParseMountStringTestCase(TestCase): + + def test_parse_mount_string_service_defaults(self): + mount = job._parse_mount_string( + "source=/foo/bar,target=/abc/xyz") + self.assertEqual(mount['Source'], '/foo/bar') + self.assertEqual(mount['Target'], '/abc/xyz') + self.assertEqual(mount['Type'], 'volume') + # Note(dzimine): blocked by https://github.com/docker/docker-py/issues/1371 + # self.assertEqual(mount['ReadOnly'], False) + + def test_parse_mount_string_service(self): + mount = job._parse_mount_string( + "type=bind,src=/foo/bar,dst=/abc/xyz,readonly,propagation=slave") + self.assertEqual(mount['Source'], '/foo/bar') + self.assertEqual(mount['Target'], '/abc/xyz') + self.assertEqual(mount['Type'], 'bind') + # Note(dzimine): blocked by https://github.com/docker/docker-py/issues/1371 + # self.assertEqual(mount['ReadOnly'], False) + self.assertEqual(mount['BindOptions']['Propagation'], 'slave') + + def test_parse_mount_string_service_advanced(self): + mount = job._parse_mount_string( + "src=/foo/bar,dst=/abc/xyz,ro,volume-label=color1=red," + "volume-label='color2=blue'") + self.assertEqual(mount['Source'], '/foo/bar') + self.assertEqual(mount['Target'], '/abc/xyz') + # Note(dzimine): blocked by https://github.com/docker/docker-py/issues/1371 + # self.assertEqual(mount['ReadOnly'], False) + self.assertDictEqual( + mount['VolumeOptions']['Labels'], + {'color1': 'red', 'color2': 'blue'}) + + +class RunJobActionTestCase(BaseActionTestCase): + + action_cls = job.RunJobAction + + @mock.patch('docker.api.APIClient.create_service') + @mock.patch('docker.api.APIClient.tasks') + def test_run(self, mock_tasks, mock_create): + mock_create.return_value = {"ID": "1111"} + mock_tasks.return_value = [{'ID': '111', 'Status': {'State': 'complete'}}] + + action = self.get_action_instance() + action.pool_interval = 0 + result = action.run( + image="alpine", command=None, args=['ping', '192.168.1.1'], + mounts=["source=/foo/bar,dst=/bar,type=bind", "src=foo,dst=bar"], + name="myjob", reserve_cpu=4, reserve_memory=536870912) + + expected = { + 'ContainerSpec': { + 'Args': ['ping', '192.168.1.1'], + 'Command': None, + 'Image': 'alpine', + 'Mounts': [ + {'Source': '/foo/bar', 'Target': '/bar', 'Type': 'bind', 'ReadOnly': False}, + {'Source': 'foo', 'Target': 'bar', 'Type': 'volume', 'ReadOnly': False} + ] + + }, + 'RestartPolicy': {'Condition': 'none'}, + 'Resources': {'Reservations': {'MemoryBytes': 536870912, 'NanoCPUs': 4}} + } + mock_create.assert_called_once_with(expected, name=result[1]['job']) + mock_tasks.assert_called_with(filters={'service': '1111'}) + + @mock.patch('docker.api.APIClient.create_service') + @mock.patch('docker.api.APIClient.tasks') + def test_run_with_defaults(self, mock_tasks, mock_create): + mock_create.return_value = {"ID": "1111"} + mock_tasks.return_value = [{'ID': '111', 'Status': {'State': 'complete'}}] + + action = self.get_action_instance() + action.pool_interval = 0 + result = action.run(image="alpine") + + expected = { + 'ContainerSpec': { + 'Image': 'alpine', + 'Command': None, + 'Args': None + }, + 'RestartPolicy': {'Condition': 'none'}, + 'Resources': {'Reservations': {'MemoryBytes': None, 'NanoCPUs': None}} + } + mock_create.assert_called_once_with(expected, name=result[1]['job']) + mock_tasks.assert_called_with(filters={'service': '1111'}) + + @skip("Only run on real swarm") + def test_run_real(self): + image = "pregistry:5000/encode" + command = None + args = ["-i/share/li.txt", "-o/share/li.out", "--delay", "10"] + mounts = ["source=/vagrant/share,dst=/share,type=bind", "src=share,dst=/bar"] + + action = self.get_action_instance() + result = action.run( + image=image, command=command, args=args, + mounts=mounts) + print result diff --git a/swarm/tests/test_sensor_pending_queue.py b/swarm/tests/test_sensor_pending_queue.py new file mode 100644 index 0000000..f5f0b58 --- /dev/null +++ b/swarm/tests/test_sensor_pending_queue.py @@ -0,0 +1,75 @@ +import mock +from unittest2 import skip +import yaml + +from st2tests.base import BaseSensorTestCase + +import pending_queue + + +class SwarmPendingTasksSensorTestCase(BaseSensorTestCase): + sensor_cls = pending_queue.SwarmPendingTasksSensor + + def setUp(self): + super(SwarmPendingTasksSensorTestCase, self).setUp() + self.config = yaml.safe_load(self.get_fixture_content("config.yaml")) + + @mock.patch('docker.api.APIClient.tasks') + def test_poll_empty(self, mock_tasks): + mock_tasks.return_value = [] + sensor = self.get_sensor_instance(config=self.config) + sensor.setup() + sensor.poll() + self.assertEqual(self.get_dispatched_triggers(), []) + + @mock.patch('docker.api.APIClient.tasks') + def test_poll(self, mock_tasks): + mock_tasks.return_value = [ + {'ID': '111', 'Status': {'State': 'pending'}}, + {'ID': '112', 'Status': {'State': 'pending'}}, + {'ID': '113', 'Status': {'State': 'pending'}}, + ] + sensor = self.get_sensor_instance() + sensor.setup() + sensor.poll() + self.assertTriggerDispatched(trigger=pending_queue.TRIGGER) + self.assertEqual( + self.get_dispatched_triggers()[0]['payload']['count'], + len(mock_tasks.return_value)) + self.assertEqual( + self.get_dispatched_triggers()[0]['payload']['tasks'], + mock_tasks.return_value) + self.assertEqual( + self.get_dispatched_triggers()[0]['payload']['over_threshold'], True) + # Check that trigger dispatch only once + sensor.poll() + self.assertEqual(len(self.get_dispatched_triggers()), 1) + + @mock.patch('docker.api.APIClient.tasks') + def test_going_below_threshlold(self, mock_tasks): + mock_tasks.return_value = [{'ID': '111', 'Status': {'State': 'pending'}}] + sensor = self.get_sensor_instance() + sensor.setup() + sensor.threshold = 2 + sensor.over_threshold = True + sensor.poll() + # print sensor._logger.mock_calls + self.assertTriggerDispatched(trigger=pending_queue.TRIGGER) + self.assertEqual( + self.get_dispatched_triggers()[0]['payload']['over_threshold'], False) + + @mock.patch('docker.api.APIClient.tasks') + def test_poll_threshold(self, mock_tasks): + mock_tasks.return_value = [{'ID': '111', 'Status': {'State': 'pending'}}] + sensor = self.get_sensor_instance(config=self.config) + sensor.setup() + sensor.poll() + self.assertEqual(self.get_dispatched_triggers(), []) + + @skip("Only run on real swarm, with pending tasks") + def test_poll_real(self): + sensor = self.get_sensor_instance() + sensor.setup() + sensor.poll() + self.assertTriggerDispatched(trigger=pending_queue.TRIGGER) + print sensor._logger.mock_calls