misc_spec.rb 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # frozen_string_literal: true
  2. require_relative 'spec_helper'
  3. describe "Miscellaneous" do
  4. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
  5. after do
  6. processes.all_databases.map(&:reset)
  7. processes.pgcat.shutdown
  8. end
  9. context "when adding then removing instance using RELOAD" do
  10. it "works correctly" do
  11. admin_conn = PG::connect(processes.pgcat.admin_connection_string)
  12. current_configs = processes.pgcat.current_config
  13. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  14. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  15. extra_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].last.clone
  16. extra_replica[0] = "127.0.0.1"
  17. current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << extra_replica
  18. processes.pgcat.update_config(current_configs) # with replica added
  19. processes.pgcat.reload_config
  20. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  21. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  22. current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
  23. processes.pgcat.update_config(current_configs) # with replica removed again
  24. processes.pgcat.reload_config
  25. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  26. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  27. end
  28. end
  29. context "when removing then adding instance back using RELOAD" do
  30. it "works correctly" do
  31. admin_conn = PG::connect(processes.pgcat.admin_connection_string)
  32. current_configs = processes.pgcat.current_config
  33. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  34. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  35. removed_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
  36. processes.pgcat.update_config(current_configs) # with replica removed
  37. processes.pgcat.reload_config
  38. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  39. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  40. current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << removed_replica
  41. processes.pgcat.update_config(current_configs) # with replica added again
  42. processes.pgcat.reload_config
  43. correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
  44. expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
  45. end
  46. end
  47. describe "TCP Keepalives" do
  48. # Ideally, we should block TCP traffic to the database using
  49. # iptables to mimic passive (connection is dropped without a RST packet)
  50. # but we cannot do this in CircleCI because iptables requires NET_ADMIN
  51. # capability that we cannot enable in CircleCI
  52. # Toxiproxy won't work either because it does not block keepalives
  53. # so our best bet is to query the OS keepalive params set on the socket
  54. context "default settings" do
  55. it "applies default keepalive settings" do
  56. # We query ss command to verify that we have correct keepalive values set
  57. # we can only verify the keepalives_idle parameter but that's good enough
  58. # example output
  59. #Recv-Q Send-Q Local Address:Port Peer Address:Port Process
  60. #0 0 127.0.0.1:60526 127.0.0.1:18432 timer:(keepalive,1min59sec,0)
  61. #0 0 127.0.0.1:60664 127.0.0.1:19432 timer:(keepalive,4.123ms,0)
  62. port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
  63. results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
  64. results.shift
  65. results.each { |line| expect(line).to match(/timer:\(keepalive,.*ms,0\)/) }
  66. end
  67. end
  68. context "changed settings" do
  69. it "applies keepalive settings from config" do
  70. new_configs = processes.pgcat.current_config
  71. new_configs["general"]["tcp_keepalives_idle"] = 120
  72. new_configs["general"]["tcp_keepalives_count"] = 1
  73. new_configs["general"]["tcp_keepalives_interval"] = 1
  74. processes.pgcat.update_config(new_configs)
  75. # We need to kill the old process that was using the default configs
  76. processes.pgcat.stop
  77. processes.pgcat.start
  78. processes.pgcat.wait_until_ready
  79. port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
  80. results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
  81. results.shift
  82. results.each { |line| expect(line).to include("timer:(keepalive,1min") }
  83. end
  84. end
  85. end
  86. describe "Extended Protocol handling" do
  87. it "does not send packets that client does not expect during extended protocol sequence" do
  88. new_configs = processes.pgcat.current_config
  89. new_configs["general"]["connect_timeout"] = 500
  90. new_configs["general"]["ban_time"] = 1
  91. new_configs["general"]["shutdown_timeout"] = 1
  92. new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
  93. processes.pgcat.update_config(new_configs)
  94. processes.pgcat.reload_config
  95. 25.times do
  96. Thread.new do
  97. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  98. conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
  99. ensure
  100. conn&.close
  101. end
  102. end
  103. sleep(0.5)
  104. conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  105. stdout, stderr = with_captured_stdout_stderr do
  106. 15.times do |i|
  107. conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
  108. conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
  109. sleep 1
  110. end
  111. end
  112. raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
  113. end
  114. end
  115. describe "Pool recycling after config reload" do
  116. let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
  117. it "should update pools for new clients and clients that are no longer in transaction" do
  118. server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  119. server_conn.async_exec("BEGIN")
  120. # No config change yet, client should set old configs
  121. current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
  122. expect(current_datebase_from_pg).to eq('shard0')
  123. # Swap shards
  124. new_config = processes.pgcat.current_config
  125. shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
  126. shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
  127. new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
  128. new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
  129. # Reload config
  130. processes.pgcat.update_config(new_config)
  131. processes.pgcat.reload_config
  132. sleep 0.5
  133. # Config changed but transaction is in progress, client should set old configs
  134. current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
  135. expect(current_datebase_from_pg).to eq('shard0')
  136. server_conn.async_exec("COMMIT")
  137. # Transaction finished, client should get new configs
  138. current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
  139. expect(current_datebase_from_pg).to eq('shard1')
  140. # New connection should get new configs
  141. server_conn.close()
  142. server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  143. current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
  144. expect(current_datebase_from_pg).to eq('shard1')
  145. end
  146. end
  147. describe "Clients closing connection in the middle of transaction" do
  148. it "sends a rollback to the server" do
  149. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  150. conn.async_exec("SET SERVER ROLE to 'primary'")
  151. conn.async_exec("BEGIN")
  152. conn.close
  153. expect(processes.primary.count_query("ROLLBACK")).to eq(1)
  154. end
  155. end
  156. describe "Server version reporting" do
  157. it "reports correct version for normal and admin databases" do
  158. server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  159. expect(server_conn.server_version).not_to eq(0)
  160. server_conn.close
  161. admin_conn = PG::connect(processes.pgcat.admin_connection_string)
  162. expect(admin_conn.server_version).not_to eq(0)
  163. admin_conn.close
  164. end
  165. end
  166. describe "State clearance" do
  167. context "session mode" do
  168. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
  169. it "Clears state before connection checkin" do
  170. # Both modes of operation should not raise
  171. # ERROR: prepared statement "prepared_q" already exists
  172. 15.times do
  173. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  174. conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
  175. conn.close
  176. end
  177. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  178. initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
  179. conn.async_exec("SET statement_timeout to 1000")
  180. current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
  181. expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
  182. conn.close
  183. end
  184. it "Does not send DISCARD ALL unless necessary" do
  185. 10.times do
  186. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  187. conn.async_exec("SET SERVER ROLE to 'primary'")
  188. conn.async_exec("SELECT 1")
  189. conn.close
  190. end
  191. expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
  192. 10.times do
  193. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  194. conn.async_exec("SET SERVER ROLE to 'primary'")
  195. conn.async_exec("SELECT 1")
  196. conn.async_exec("SET statement_timeout to 5000")
  197. conn.close
  198. end
  199. expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
  200. end
  201. it "Resets server roles correctly" do
  202. 10.times do
  203. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  204. conn.async_exec("SET SERVER ROLE to 'primary'")
  205. conn.async_exec("SELECT 1")
  206. conn.async_exec("SET statement_timeout to 5000")
  207. conn.close
  208. end
  209. expect(processes.primary.count_query("RESET ROLE")).to eq(10)
  210. end
  211. end
  212. context "transaction mode" do
  213. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
  214. it "Clears state before connection checkin" do
  215. # Both modes of operation should not raise
  216. # ERROR: prepared statement "prepared_q" already exists
  217. 15.times do
  218. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  219. conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
  220. conn.close
  221. end
  222. 15.times do
  223. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  224. conn.prepare("prepared_q", "SELECT $1")
  225. conn.close
  226. end
  227. end
  228. it "Does not send DISCARD ALL unless necessary" do
  229. 10.times do
  230. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  231. conn.async_exec("SET SERVER ROLE to 'primary'")
  232. conn.async_exec("SELECT 1")
  233. conn.exec_params("SELECT $1", [1])
  234. conn.close
  235. end
  236. expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
  237. 10.times do
  238. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  239. conn.async_exec("SET SERVER ROLE to 'primary'")
  240. conn.async_exec("SELECT 1")
  241. conn.async_exec("SET statement_timeout to 5000")
  242. conn.close
  243. end
  244. expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
  245. end
  246. end
  247. context "transaction mode with transactions" do
  248. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
  249. it "Does not clear set statement state when declared in a transaction" do
  250. 10.times do
  251. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  252. conn.async_exec("SET SERVER ROLE to 'primary'")
  253. conn.async_exec("BEGIN")
  254. conn.async_exec("SET statement_timeout to 1000")
  255. conn.async_exec("COMMIT")
  256. conn.close
  257. end
  258. expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
  259. 10.times do
  260. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  261. conn.async_exec("SET SERVER ROLE to 'primary'")
  262. conn.async_exec("BEGIN")
  263. conn.async_exec("SET LOCAL statement_timeout to 1000")
  264. conn.async_exec("COMMIT")
  265. conn.close
  266. end
  267. expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
  268. end
  269. end
  270. end
  271. describe "Idle client timeout" do
  272. context "idle transaction timeout set to 0" do
  273. before do
  274. current_configs = processes.pgcat.current_config
  275. correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
  276. puts(current_configs["general"]["idle_client_in_transaction_timeout"])
  277. current_configs["general"]["idle_client_in_transaction_timeout"] = 0
  278. processes.pgcat.update_config(current_configs) # with timeout 0
  279. processes.pgcat.reload_config
  280. end
  281. it "Allow client to be idle in transaction" do
  282. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  283. conn.async_exec("BEGIN")
  284. conn.async_exec("SELECT 1")
  285. sleep(2)
  286. conn.async_exec("COMMIT")
  287. conn.close
  288. end
  289. end
  290. context "idle transaction timeout set to 500ms" do
  291. before do
  292. current_configs = processes.pgcat.current_config
  293. correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
  294. current_configs["general"]["idle_client_in_transaction_timeout"] = 500
  295. processes.pgcat.update_config(current_configs) # with timeout 500
  296. processes.pgcat.reload_config
  297. end
  298. it "Allow client to be idle in transaction below timeout" do
  299. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  300. conn.async_exec("BEGIN")
  301. conn.async_exec("SELECT 1")
  302. sleep(0.4) # below 500ms
  303. conn.async_exec("COMMIT")
  304. conn.close
  305. end
  306. it "Error when client idle in transaction time exceeds timeout" do
  307. conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  308. conn.async_exec("BEGIN")
  309. conn.async_exec("SELECT 1")
  310. sleep(1) # above 500ms
  311. expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
  312. conn.async_exec("SELECT 1") # should be able to send another query
  313. conn.close
  314. end
  315. end
  316. end
  317. end