|
- # frozen_string_literal: true
- require_relative 'spec_helper'
- describe "Miscellaneous" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
- after do
- processes.all_databases.map(&:reset)
- processes.pgcat.shutdown
- end
- context "when adding then removing instance using RELOAD" do
- it "works correctly" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- current_configs = processes.pgcat.current_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- extra_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].last.clone
- extra_replica[0] = "127.0.0.1"
- current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << extra_replica
- processes.pgcat.update_config(current_configs) # with replica added
- processes.pgcat.reload_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
- processes.pgcat.update_config(current_configs) # with replica removed again
- processes.pgcat.reload_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- end
- end
- context "when removing then adding instance back using RELOAD" do
- it "works correctly" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- current_configs = processes.pgcat.current_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- removed_replica = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].pop
- processes.pgcat.update_config(current_configs) # with replica removed
- processes.pgcat.reload_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"] << removed_replica
- processes.pgcat.update_config(current_configs) # with replica added again
- processes.pgcat.reload_config
- correct_count = current_configs["pools"]["sharded_db"]["shards"]["0"]["servers"].count
- expect(admin_conn.async_exec("SHOW DATABASES").count).to eq(correct_count)
- end
- end
- describe "TCP Keepalives" do
- # Ideally, we should block TCP traffic to the database using
- # iptables to mimic passive (connection is dropped without a RST packet)
- # but we cannot do this in CircleCI because iptables requires NET_ADMIN
- # capability that we cannot enable in CircleCI
- # Toxiproxy won't work either because it does not block keepalives
- # so our best bet is to query the OS keepalive params set on the socket
- context "default settings" do
- it "applies default keepalive settings" do
- # We query ss command to verify that we have correct keepalive values set
- # we can only verify the keepalives_idle parameter but that's good enough
- # example output
- #Recv-Q Send-Q Local Address:Port Peer Address:Port Process
- #0 0 127.0.0.1:60526 127.0.0.1:18432 timer:(keepalive,1min59sec,0)
- #0 0 127.0.0.1:60664 127.0.0.1:19432 timer:(keepalive,4.123ms,0)
- port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
- results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
- results.shift
- results.each { |line| expect(line).to match(/timer:\(keepalive,.*ms,0\)/) }
- end
- end
- context "changed settings" do
- it "applies keepalive settings from config" do
- new_configs = processes.pgcat.current_config
- new_configs["general"]["tcp_keepalives_idle"] = 120
- new_configs["general"]["tcp_keepalives_count"] = 1
- new_configs["general"]["tcp_keepalives_interval"] = 1
- processes.pgcat.update_config(new_configs)
- # We need to kill the old process that was using the default configs
- processes.pgcat.stop
- processes.pgcat.start
- processes.pgcat.wait_until_ready
- port_search_criteria = processes.all_databases.map { |d| "dport = :#{d.port}"}.join(" or ")
- results = `ss -t4 state established -o -at '( #{port_search_criteria} )'`.lines
- results.shift
- results.each { |line| expect(line).to include("timer:(keepalive,1min") }
- end
- end
- end
- describe "Extended Protocol handling" do
- it "does not send packets that client does not expect during extended protocol sequence" do
- new_configs = processes.pgcat.current_config
- new_configs["general"]["connect_timeout"] = 500
- new_configs["general"]["ban_time"] = 1
- new_configs["general"]["shutdown_timeout"] = 1
- new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
- processes.pgcat.update_config(new_configs)
- processes.pgcat.reload_config
- 25.times do
- Thread.new do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SELECT pg_sleep(5)") rescue PG::SystemError
- ensure
- conn&.close
- end
- end
- sleep(0.5)
- conn_under_test = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- stdout, stderr = with_captured_stdout_stderr do
- 15.times do |i|
- conn_under_test.async_exec("SELECT 1") rescue PG::SystemError
- conn_under_test.exec_params("SELECT #{i} + $1", [i]) rescue PG::SystemError
- sleep 1
- end
- end
- raise StandardError, "Libpq got unexpected messages while idle" if stderr.include?("arrived from server while idle")
- end
- end
- describe "Pool recycling after config reload" do
- let(:processes) { Helpers::Pgcat.three_shard_setup("sharded_db", 5) }
- it "should update pools for new clients and clients that are no longer in transaction" do
- server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- server_conn.async_exec("BEGIN")
- # No config change yet, client should set old configs
- current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
- expect(current_datebase_from_pg).to eq('shard0')
- # Swap shards
- new_config = processes.pgcat.current_config
- shard0 = new_config["pools"]["sharded_db"]["shards"]["0"]
- shard1 = new_config["pools"]["sharded_db"]["shards"]["1"]
- new_config["pools"]["sharded_db"]["shards"]["0"] = shard1
- new_config["pools"]["sharded_db"]["shards"]["1"] = shard0
- # Reload config
- processes.pgcat.update_config(new_config)
- processes.pgcat.reload_config
- sleep 0.5
- # Config changed but transaction is in progress, client should set old configs
- current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
- expect(current_datebase_from_pg).to eq('shard0')
- server_conn.async_exec("COMMIT")
- # Transaction finished, client should get new configs
- current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
- expect(current_datebase_from_pg).to eq('shard1')
- # New connection should get new configs
- server_conn.close()
- server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- current_datebase_from_pg = server_conn.async_exec("SELECT current_database();")[0]["current_database"]
- expect(current_datebase_from_pg).to eq('shard1')
- end
- end
- describe "Clients closing connection in the middle of transaction" do
- it "sends a rollback to the server" do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("BEGIN")
- conn.close
- expect(processes.primary.count_query("ROLLBACK")).to eq(1)
- end
- end
- describe "Server version reporting" do
- it "reports correct version for normal and admin databases" do
- server_conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- expect(server_conn.server_version).not_to eq(0)
- server_conn.close
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- expect(admin_conn.server_version).not_to eq(0)
- admin_conn.close
- end
- end
- describe "State clearance" do
- context "session mode" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "session") }
- it "Clears state before connection checkin" do
- # Both modes of operation should not raise
- # ERROR: prepared statement "prepared_q" already exists
- 15.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
- conn.close
- end
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- initial_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
- conn.async_exec("SET statement_timeout to 1000")
- current_value = conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]
- expect(conn.async_exec("SHOW statement_timeout")[0]["statement_timeout"]).to eq("1s")
- conn.close
- end
- it "Does not send DISCARD ALL unless necessary" do
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("SELECT 1")
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("SELECT 1")
- conn.async_exec("SET statement_timeout to 5000")
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
- end
- it "Resets server roles correctly" do
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("SELECT 1")
- conn.async_exec("SET statement_timeout to 5000")
- conn.close
- end
- expect(processes.primary.count_query("RESET ROLE")).to eq(10)
- end
- end
- context "transaction mode" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
- it "Clears state before connection checkin" do
- # Both modes of operation should not raise
- # ERROR: prepared statement "prepared_q" already exists
- 15.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("PREPARE prepared_q (int) AS SELECT $1")
- conn.close
- end
- 15.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.prepare("prepared_q", "SELECT $1")
- conn.close
- end
- end
- it "Does not send DISCARD ALL unless necessary" do
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("SELECT 1")
- conn.exec_params("SELECT $1", [1])
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("SELECT 1")
- conn.async_exec("SET statement_timeout to 5000")
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(10)
- end
- end
- context "transaction mode with transactions" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5, "transaction") }
- it "Does not clear set statement state when declared in a transaction" do
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("BEGIN")
- conn.async_exec("SET statement_timeout to 1000")
- conn.async_exec("COMMIT")
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
- 10.times do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("SET SERVER ROLE to 'primary'")
- conn.async_exec("BEGIN")
- conn.async_exec("SET LOCAL statement_timeout to 1000")
- conn.async_exec("COMMIT")
- conn.close
- end
- expect(processes.primary.count_query("DISCARD ALL")).to eq(0)
- end
- end
- end
- describe "Idle client timeout" do
- context "idle transaction timeout set to 0" do
- before do
- current_configs = processes.pgcat.current_config
- correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
- puts(current_configs["general"]["idle_client_in_transaction_timeout"])
-
- current_configs["general"]["idle_client_in_transaction_timeout"] = 0
-
- processes.pgcat.update_config(current_configs) # with timeout 0
- processes.pgcat.reload_config
- end
- it "Allow client to be idle in transaction" do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("BEGIN")
- conn.async_exec("SELECT 1")
- sleep(2)
- conn.async_exec("COMMIT")
- conn.close
- end
- end
- context "idle transaction timeout set to 500ms" do
- before do
- current_configs = processes.pgcat.current_config
- correct_idle_client_transaction_timeout = current_configs["general"]["idle_client_in_transaction_timeout"]
- current_configs["general"]["idle_client_in_transaction_timeout"] = 500
-
- processes.pgcat.update_config(current_configs) # with timeout 500
- processes.pgcat.reload_config
- end
- it "Allow client to be idle in transaction below timeout" do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("BEGIN")
- conn.async_exec("SELECT 1")
- sleep(0.4) # below 500ms
- conn.async_exec("COMMIT")
- conn.close
- end
- it "Error when client idle in transaction time exceeds timeout" do
- conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("BEGIN")
- conn.async_exec("SELECT 1")
- sleep(1) # above 500ms
- expect{ conn.async_exec("COMMIT") }.to raise_error(PG::SystemError, /idle transaction timeout/)
- conn.async_exec("SELECT 1") # should be able to send another query
- conn.close
- end
- end
- end
- end
|