123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318 |
- from typing import Tuple
- import psycopg2
- import psutil
- import os
- import signal
- import time
- SHUTDOWN_TIMEOUT = 5
- PGCAT_HOST = "127.0.0.1"
- PGCAT_PORT = "6432"
- def pgcat_start():
- pg_cat_send_signal(signal.SIGTERM)
- os.system("./target/debug/pgcat .circleci/pgcat.toml &")
- time.sleep(2)
- def pg_cat_send_signal(signal: signal.Signals):
- try:
- for proc in psutil.process_iter(["pid", "name"]):
- if "pgcat" == proc.name():
- os.kill(proc.pid, signal)
- except Exception as e:
- # The process can be gone when we send this signal
- print(e)
- if signal == signal.SIGTERM:
- # Returns 0 if pgcat process exists
- time.sleep(2)
- if not os.system('pgrep pgcat'):
- raise Exception("pgcat not closed after SIGTERM")
- def connect_db(
- autocommit: bool = True,
- admin: bool = False,
- ) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
- if admin:
- user = "admin_user"
- password = "admin_pass"
- db = "pgcat"
- else:
- user = "sharding_user"
- password = "sharding_user"
- db = "sharded_db"
- conn = psycopg2.connect(
- f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
- connect_timeout=2,
- )
- conn.autocommit = autocommit
- cur = conn.cursor()
- return (conn, cur)
- def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
- cur.close()
- conn.close()
- def test_normal_db_access():
- conn, cur = connect_db(autocommit=False)
- cur.execute("SELECT 1")
- res = cur.fetchall()
- print(res)
- cleanup_conn(conn, cur)
- def test_admin_db_access():
- conn, cur = connect_db(admin=True)
- cur.execute("SHOW POOLS")
- res = cur.fetchall()
- print(res)
- cleanup_conn(conn, cur)
- def test_shutdown_logic():
- # - - - - - - - - - - - - - - - - - -
- # NO ACTIVE QUERIES SIGINT HANDLING
- # Start pgcat
- pgcat_start()
- # Create client connection and send query (not in transaction)
- conn, cur = connect_db()
- cur.execute("BEGIN;")
- cur.execute("SELECT 1;")
- cur.execute("COMMIT;")
- # Send sigint to pgcat
- pg_cat_send_signal(signal.SIGINT)
- time.sleep(1)
- # Check that any new queries fail after sigint since server should close with no active transactions
- try:
- cur.execute("SELECT 1;")
- except psycopg2.OperationalError as e:
- pass
- else:
- # Fail if query execution succeeded
- raise Exception("Server not closed after sigint")
- cleanup_conn(conn, cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- conn, cur = connect_db()
- admin_conn, admin_cur = connect_db(admin=True)
- cur.execute("BEGIN;")
- cur.execute("SELECT 1;")
- cur.execute("COMMIT;")
- # Send SHUTDOWN command pgcat while not in transaction
- admin_cur.execute("SHUTDOWN;")
- time.sleep(1)
- # Check that any new queries fail after SHUTDOWN command since server should close with no active transactions
- try:
- cur.execute("SELECT 1;")
- except psycopg2.OperationalError as e:
- pass
- else:
- # Fail if query execution succeeded
- raise Exception("Server not closed after sigint")
- cleanup_conn(conn, cur)
- cleanup_conn(admin_conn, admin_cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # HANDLE TRANSACTION WITH SIGINT
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- conn, cur = connect_db()
- cur.execute("BEGIN;")
- cur.execute("SELECT 1;")
- # Send sigint to pgcat while still in transaction
- pg_cat_send_signal(signal.SIGINT)
- time.sleep(1)
- # Check that any new queries succeed after sigint since server should still allow transaction to complete
- try:
- cur.execute("SELECT 1;")
- except psycopg2.OperationalError as e:
- # Fail if query fails since server closed
- raise Exception("Server closed while in transaction", e.pgerror)
- cleanup_conn(conn, cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- conn, cur = connect_db()
- admin_conn, admin_cur = connect_db(admin=True)
- cur.execute("BEGIN;")
- cur.execute("SELECT 1;")
- # Send SHUTDOWN command pgcat while still in transaction
- admin_cur.execute("SHUTDOWN;")
- if admin_cur.fetchall()[0][0] != "t":
- raise Exception("PgCat unable to send signal")
- time.sleep(1)
- # Check that any new queries succeed after SHUTDOWN command since server should still allow transaction to complete
- try:
- cur.execute("SELECT 1;")
- except psycopg2.OperationalError as e:
- # Fail if query fails since server closed
- raise Exception("Server closed while in transaction", e.pgerror)
- cleanup_conn(conn, cur)
- cleanup_conn(admin_conn, admin_cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- transaction_conn, transaction_cur = connect_db()
- transaction_cur.execute("BEGIN;")
- transaction_cur.execute("SELECT 1;")
- # Send sigint to pgcat while still in transaction
- pg_cat_send_signal(signal.SIGINT)
- time.sleep(1)
- start = time.perf_counter()
- try:
- conn, cur = connect_db()
- cur.execute("SELECT 1;")
- cleanup_conn(conn, cur)
- except psycopg2.OperationalError as e:
- time_taken = time.perf_counter() - start
- if time_taken > 0.1:
- raise Exception(
- "Failed to reject connection within 0.1 seconds, got", time_taken, "seconds")
- pass
- else:
- raise Exception("Able connect to database during shutdown")
- cleanup_conn(transaction_conn, transaction_cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- transaction_conn, transaction_cur = connect_db()
- transaction_cur.execute("BEGIN;")
- transaction_cur.execute("SELECT 1;")
- # Send sigint to pgcat while still in transaction
- pg_cat_send_signal(signal.SIGINT)
- time.sleep(1)
- try:
- conn, cur = connect_db(admin=True)
- cur.execute("SHOW DATABASES;")
- cleanup_conn(conn, cur)
- except psycopg2.OperationalError as e:
- raise Exception(e)
- cleanup_conn(transaction_conn, transaction_cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction
- transaction_conn, transaction_cur = connect_db()
- transaction_cur.execute("BEGIN;")
- transaction_cur.execute("SELECT 1;")
- admin_conn, admin_cur = connect_db(admin=True)
- admin_cur.execute("SHOW DATABASES;")
- # Send sigint to pgcat while still in transaction
- pg_cat_send_signal(signal.SIGINT)
- time.sleep(1)
- try:
- admin_cur.execute("SHOW DATABASES;")
- except psycopg2.OperationalError as e:
- raise Exception("Could not execute admin command:", e)
- cleanup_conn(transaction_conn, transaction_cur)
- cleanup_conn(admin_conn, admin_cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- # HANDLE SHUTDOWN TIMEOUT WITH SIGINT
- # Start pgcat
- pgcat_start()
- # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
- conn, cur = connect_db()
- cur.execute("BEGIN;")
- cur.execute("SELECT 1;")
- # Send sigint to pgcat while still in transaction
- pg_cat_send_signal(signal.SIGINT)
- # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
- time.sleep(SHUTDOWN_TIMEOUT + 1)
- # Check that any new queries succeed after sigint since server should still allow transaction to complete
- try:
- cur.execute("SELECT 1;")
- except psycopg2.OperationalError as e:
- pass
- else:
- # Fail if query execution succeeded
- raise Exception("Server not closed after sigint and expected timeout")
- cleanup_conn(conn, cur)
- pg_cat_send_signal(signal.SIGTERM)
- # - - - - - - - - - - - - - - - - - -
- test_normal_db_access()
- test_admin_db_access()
- test_shutdown_logic()
|