tests.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. from typing import Tuple
  2. import psycopg2
  3. import psutil
  4. import os
  5. import signal
  6. import time
  7. SHUTDOWN_TIMEOUT = 5
  8. PGCAT_HOST = "127.0.0.1"
  9. PGCAT_PORT = "6432"
  10. def pgcat_start():
  11. pg_cat_send_signal(signal.SIGTERM)
  12. os.system("./target/debug/pgcat .circleci/pgcat.toml &")
  13. time.sleep(2)
  14. def pg_cat_send_signal(signal: signal.Signals):
  15. try:
  16. for proc in psutil.process_iter(["pid", "name"]):
  17. if "pgcat" == proc.name():
  18. os.kill(proc.pid, signal)
  19. except Exception as e:
  20. # The process can be gone when we send this signal
  21. print(e)
  22. if signal == signal.SIGTERM:
  23. # Returns 0 if pgcat process exists
  24. time.sleep(2)
  25. if not os.system('pgrep pgcat'):
  26. raise Exception("pgcat not closed after SIGTERM")
  27. def connect_db(
  28. autocommit: bool = True,
  29. admin: bool = False,
  30. ) -> Tuple[psycopg2.extensions.connection, psycopg2.extensions.cursor]:
  31. if admin:
  32. user = "admin_user"
  33. password = "admin_pass"
  34. db = "pgcat"
  35. else:
  36. user = "sharding_user"
  37. password = "sharding_user"
  38. db = "sharded_db"
  39. conn = psycopg2.connect(
  40. f"postgres://{user}:{password}@{PGCAT_HOST}:{PGCAT_PORT}/{db}?application_name=testing_pgcat",
  41. connect_timeout=2,
  42. )
  43. conn.autocommit = autocommit
  44. cur = conn.cursor()
  45. return (conn, cur)
  46. def cleanup_conn(conn: psycopg2.extensions.connection, cur: psycopg2.extensions.cursor):
  47. cur.close()
  48. conn.close()
  49. def test_normal_db_access():
  50. conn, cur = connect_db(autocommit=False)
  51. cur.execute("SELECT 1")
  52. res = cur.fetchall()
  53. print(res)
  54. cleanup_conn(conn, cur)
  55. def test_admin_db_access():
  56. conn, cur = connect_db(admin=True)
  57. cur.execute("SHOW POOLS")
  58. res = cur.fetchall()
  59. print(res)
  60. cleanup_conn(conn, cur)
  61. def test_shutdown_logic():
  62. # - - - - - - - - - - - - - - - - - -
  63. # NO ACTIVE QUERIES SIGINT HANDLING
  64. # Start pgcat
  65. pgcat_start()
  66. # Create client connection and send query (not in transaction)
  67. conn, cur = connect_db()
  68. cur.execute("BEGIN;")
  69. cur.execute("SELECT 1;")
  70. cur.execute("COMMIT;")
  71. # Send sigint to pgcat
  72. pg_cat_send_signal(signal.SIGINT)
  73. time.sleep(1)
  74. # Check that any new queries fail after sigint since server should close with no active transactions
  75. try:
  76. cur.execute("SELECT 1;")
  77. except psycopg2.OperationalError as e:
  78. pass
  79. else:
  80. # Fail if query execution succeeded
  81. raise Exception("Server not closed after sigint")
  82. cleanup_conn(conn, cur)
  83. pg_cat_send_signal(signal.SIGTERM)
  84. # - - - - - - - - - - - - - - - - - -
  85. # NO ACTIVE QUERIES ADMIN SHUTDOWN COMMAND
  86. # Start pgcat
  87. pgcat_start()
  88. # Create client connection and begin transaction
  89. conn, cur = connect_db()
  90. admin_conn, admin_cur = connect_db(admin=True)
  91. cur.execute("BEGIN;")
  92. cur.execute("SELECT 1;")
  93. cur.execute("COMMIT;")
  94. # Send SHUTDOWN command pgcat while not in transaction
  95. admin_cur.execute("SHUTDOWN;")
  96. time.sleep(1)
  97. # Check that any new queries fail after SHUTDOWN command since server should close with no active transactions
  98. try:
  99. cur.execute("SELECT 1;")
  100. except psycopg2.OperationalError as e:
  101. pass
  102. else:
  103. # Fail if query execution succeeded
  104. raise Exception("Server not closed after sigint")
  105. cleanup_conn(conn, cur)
  106. cleanup_conn(admin_conn, admin_cur)
  107. pg_cat_send_signal(signal.SIGTERM)
  108. # - - - - - - - - - - - - - - - - - -
  109. # HANDLE TRANSACTION WITH SIGINT
  110. # Start pgcat
  111. pgcat_start()
  112. # Create client connection and begin transaction
  113. conn, cur = connect_db()
  114. cur.execute("BEGIN;")
  115. cur.execute("SELECT 1;")
  116. # Send sigint to pgcat while still in transaction
  117. pg_cat_send_signal(signal.SIGINT)
  118. time.sleep(1)
  119. # Check that any new queries succeed after sigint since server should still allow transaction to complete
  120. try:
  121. cur.execute("SELECT 1;")
  122. except psycopg2.OperationalError as e:
  123. # Fail if query fails since server closed
  124. raise Exception("Server closed while in transaction", e.pgerror)
  125. cleanup_conn(conn, cur)
  126. pg_cat_send_signal(signal.SIGTERM)
  127. # - - - - - - - - - - - - - - - - - -
  128. # HANDLE TRANSACTION WITH ADMIN SHUTDOWN COMMAND
  129. # Start pgcat
  130. pgcat_start()
  131. # Create client connection and begin transaction
  132. conn, cur = connect_db()
  133. admin_conn, admin_cur = connect_db(admin=True)
  134. cur.execute("BEGIN;")
  135. cur.execute("SELECT 1;")
  136. # Send SHUTDOWN command pgcat while still in transaction
  137. admin_cur.execute("SHUTDOWN;")
  138. if admin_cur.fetchall()[0][0] != "t":
  139. raise Exception("PgCat unable to send signal")
  140. time.sleep(1)
  141. # Check that any new queries succeed after SHUTDOWN command since server should still allow transaction to complete
  142. try:
  143. cur.execute("SELECT 1;")
  144. except psycopg2.OperationalError as e:
  145. # Fail if query fails since server closed
  146. raise Exception("Server closed while in transaction", e.pgerror)
  147. cleanup_conn(conn, cur)
  148. cleanup_conn(admin_conn, admin_cur)
  149. pg_cat_send_signal(signal.SIGTERM)
  150. # - - - - - - - - - - - - - - - - - -
  151. # NO NEW NON-ADMIN CONNECTIONS DURING SHUTDOWN
  152. # Start pgcat
  153. pgcat_start()
  154. # Create client connection and begin transaction
  155. transaction_conn, transaction_cur = connect_db()
  156. transaction_cur.execute("BEGIN;")
  157. transaction_cur.execute("SELECT 1;")
  158. # Send sigint to pgcat while still in transaction
  159. pg_cat_send_signal(signal.SIGINT)
  160. time.sleep(1)
  161. start = time.perf_counter()
  162. try:
  163. conn, cur = connect_db()
  164. cur.execute("SELECT 1;")
  165. cleanup_conn(conn, cur)
  166. except psycopg2.OperationalError as e:
  167. time_taken = time.perf_counter() - start
  168. if time_taken > 0.1:
  169. raise Exception(
  170. "Failed to reject connection within 0.1 seconds, got", time_taken, "seconds")
  171. pass
  172. else:
  173. raise Exception("Able connect to database during shutdown")
  174. cleanup_conn(transaction_conn, transaction_cur)
  175. pg_cat_send_signal(signal.SIGTERM)
  176. # - - - - - - - - - - - - - - - - - -
  177. # ALLOW NEW ADMIN CONNECTIONS DURING SHUTDOWN
  178. # Start pgcat
  179. pgcat_start()
  180. # Create client connection and begin transaction
  181. transaction_conn, transaction_cur = connect_db()
  182. transaction_cur.execute("BEGIN;")
  183. transaction_cur.execute("SELECT 1;")
  184. # Send sigint to pgcat while still in transaction
  185. pg_cat_send_signal(signal.SIGINT)
  186. time.sleep(1)
  187. try:
  188. conn, cur = connect_db(admin=True)
  189. cur.execute("SHOW DATABASES;")
  190. cleanup_conn(conn, cur)
  191. except psycopg2.OperationalError as e:
  192. raise Exception(e)
  193. cleanup_conn(transaction_conn, transaction_cur)
  194. pg_cat_send_signal(signal.SIGTERM)
  195. # - - - - - - - - - - - - - - - - - -
  196. # ADMIN CONNECTIONS CONTINUING TO WORK AFTER SHUTDOWN
  197. # Start pgcat
  198. pgcat_start()
  199. # Create client connection and begin transaction
  200. transaction_conn, transaction_cur = connect_db()
  201. transaction_cur.execute("BEGIN;")
  202. transaction_cur.execute("SELECT 1;")
  203. admin_conn, admin_cur = connect_db(admin=True)
  204. admin_cur.execute("SHOW DATABASES;")
  205. # Send sigint to pgcat while still in transaction
  206. pg_cat_send_signal(signal.SIGINT)
  207. time.sleep(1)
  208. try:
  209. admin_cur.execute("SHOW DATABASES;")
  210. except psycopg2.OperationalError as e:
  211. raise Exception("Could not execute admin command:", e)
  212. cleanup_conn(transaction_conn, transaction_cur)
  213. cleanup_conn(admin_conn, admin_cur)
  214. pg_cat_send_signal(signal.SIGTERM)
  215. # - - - - - - - - - - - - - - - - - -
  216. # HANDLE SHUTDOWN TIMEOUT WITH SIGINT
  217. # Start pgcat
  218. pgcat_start()
  219. # Create client connection and begin transaction, which should prevent server shutdown unless shutdown timeout is reached
  220. conn, cur = connect_db()
  221. cur.execute("BEGIN;")
  222. cur.execute("SELECT 1;")
  223. # Send sigint to pgcat while still in transaction
  224. pg_cat_send_signal(signal.SIGINT)
  225. # pgcat shutdown timeout is set to SHUTDOWN_TIMEOUT seconds, so we sleep for SHUTDOWN_TIMEOUT + 1 seconds
  226. time.sleep(SHUTDOWN_TIMEOUT + 1)
  227. # Check that any new queries succeed after sigint since server should still allow transaction to complete
  228. try:
  229. cur.execute("SELECT 1;")
  230. except psycopg2.OperationalError as e:
  231. pass
  232. else:
  233. # Fail if query execution succeeded
  234. raise Exception("Server not closed after sigint and expected timeout")
  235. cleanup_conn(conn, cur)
  236. pg_cat_send_signal(signal.SIGTERM)
  237. # - - - - - - - - - - - - - - - - - -
  238. test_normal_db_access()
  239. test_admin_db_access()
  240. test_shutdown_logic()