Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions citus_dev/citus_dev
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""citus_dev

Usage:
citus_dev make <name> [--size=<count>] [--port=<port>] [--use-ssl] [--no-extension] [--no-lib] [--destroy] [--init-with=<sql_file>] [--init-worker-with=<sql_file>] [--with-pgbouncer] [--fsync]
citus_dev make <name> [--size=<count>] [--port=<port>] [--use-ssl] [--no-extension] [--no-lib] [--destroy] [--init-with=<sql_file>] [--init-worker-with=<sql_file>] [--with-pgbouncer] [--fsync] [--dbname=<database_name>]
citus_dev restart <name> [--watch]
citus_dev (start|stop) <name> [--force]

Expand All @@ -19,6 +19,7 @@ Options:
--with-pgbouncer Setup pgbouncers between worker and coordinator (requires citus enterprise)
--fsync Make data in citus_dev clusters safe across computer crashes (slower)
--force Forceful shutdown
--dbname=<database_name> Database name to create and use instead of default postgres database

"""
from docopt import docopt
Expand All @@ -29,13 +30,13 @@ import subprocess
import sys
import getpass
import time
import distutils.spawn
import shutil

# for osx we might want to start postgres via a fixopen binary that preloads a
# dylib to fix the interrupted systemcall, this is done by changing the postgres
# path for pg_ctl to the fixopen binary
pgctl_flags = ""
fixopen = distutils.spawn.find_executable("postgres.fixopen")
fixopen = shutil.which("postgres.fixopen")
if fixopen:
pgctl_flags += f' -p "{fixopen}"'

Expand Down Expand Up @@ -159,22 +160,31 @@ def main(arguments):
run(f'pg_ctl {pgctl_flags} start -D {clustername}/{role}')
port = cport

# Get the database name from arguments or use default
dbname = arguments.get('--dbname')

if getpass.getuser() != 'postgres' and not os.getenv('PGDATABASE'):
for i in range(size + 1):
nodeport = port + i
run(f'createdb -p {nodeport}')
if dbname:
run(f'createdb -p {nodeport} {dbname}')
else:
run(f'createdb -p {nodeport}')

if not arguments["--no-extension"]:
# Construct database connection parameter
db_param = f' -d {dbname}' if dbname else ''

for i in range(size + 1):
nodeport = port + i
run(f'psql -p {nodeport} -c "CREATE EXTENSION citus;"')
run(f'psql -p {nodeport}{db_param} -c "CREATE EXTENSION citus;"')

run(f"psql -p {port} -c \"SELECT * from citus_set_coordinator_host('localhost', {port});\"")
run(f"psql -p {port}{db_param} -c \"SELECT * from citus_set_coordinator_host('localhost', {port});\"")

for i in range(size):
workerport = port + 1 + i
run(f"psql -p {port} -c \"SELECT * from master_add_node('localhost', {workerport});\"")
run(f'psql -p {port} -c "SELECT * from master_get_active_worker_nodes();"')
run(f"psql -p {port}{db_param} -c \"SELECT * from master_add_node('localhost', {workerport});\"")
run(f'psql -p {port}{db_param} -c "SELECT * from master_get_active_worker_nodes();"')

if pgbouncer:
# need to start pgbouncers and configure pg_dist_poolinfo
Expand All @@ -183,15 +193,15 @@ def main(arguments):
workerPort = port + i + 1
bouncerPort = port + i + 101
run(f'pgbouncer -d {clustername}/worker{i}.pgbouncer.ini')
run(f"psql -p {coordinatorPort} -c \"INSERT INTO pg_dist_poolinfo SELECT nodeid, 'host=localhost port={bouncerPort}' AS poolinfo FROM pg_dist_node WHERE nodeport = {workerPort};\"")
run(f"psql -p {coordinatorPort}{db_param} -c \"INSERT INTO pg_dist_poolinfo SELECT nodeid, 'host=localhost port={bouncerPort}' AS poolinfo FROM pg_dist_node WHERE nodeport = {workerPort};\"")


if arguments['--init-with']:
run(f'psql -p {cport} -f {arguments["--init-with"]} -v ON_ERROR_STOP=1')
run(f'psql -p {cport}{db_param} -f {arguments["--init-with"]} -v ON_ERROR_STOP=1')
if arguments['--init-worker-with']:
for i in range(size):
workerport = port + 1 + i
run(f'psql -p {workerport} -f {arguments["--init-worker-with"]} -v ON_ERROR_STOP=1')
run(f'psql -p {workerport}{db_param} -f {arguments["--init-worker-with"]} -v ON_ERROR_STOP=1')

elif arguments["stop"]:
clusterName = arguments["<name>"]
Expand Down
Loading