123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413 |
- # frozen_string_literal: true
- require 'uri'
- require_relative 'spec_helper'
- describe "Admin" do
- let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
- let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
- after do
- processes.all_databases.map(&:reset)
- processes.pgcat.shutdown
- end
- describe "SHOW STATS" do
- context "clients connect and make one query" do
- it "updates *_query_time and *_wait_time" do
- connections = Array.new(3) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- Thread.new { c.async_exec("SELECT pg_sleep(0.25)") }
- end
- sleep(1)
- connections.map(&:close)
- # wait for averages to be calculated, we shouldn't do this too often
- sleep(15.5)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW STATS")[0]
- admin_conn.close
- expect(results["total_query_time"].to_i).to be_within(200).of(750)
- expect(results["avg_query_time"].to_i).to be_within(20).of(50)
- expect(results["total_wait_time"].to_i).to_not eq(0)
- expect(results["avg_wait_time"].to_i).to_not eq(0)
- end
- end
- end
- describe "SHOW POOLS" do
- context "bad credentials" do
- it "does not change any stats" do
- bad_password_url = URI(pgcat_conn_str)
- bad_password_url.password = "wrong"
- expect { PG::connect("#{bad_password_url.to_s}?application_name=bad_password") }.to raise_error(PG::ConnectionBad)
- sleep(1)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["sv_idle"]).to eq("1")
- end
- end
- context "bad database name" do
- it "does not change any stats" do
- bad_db_url = URI(pgcat_conn_str)
- bad_db_url.path = "/wrong_db"
- expect { PG::connect("#{bad_db_url.to_s}?application_name=bad_db") }.to raise_error(PG::ConnectionBad)
- sleep(1)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["sv_idle"]).to eq("1")
- end
- end
- context "client connects but issues no queries" do
- it "only affects cl_idle stats" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- before_test = admin_conn.async_exec("SHOW POOLS")[0]["sv_idle"]
- connections = Array.new(20) { PG::connect(pgcat_conn_str) }
- sleep(1)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_idle"]).to eq("20")
- expect(results["sv_idle"]).to eq(before_test)
- connections.map(&:close)
- sleep(1.1)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_active cl_idle cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["sv_idle"]).to eq(before_test)
- end
- end
- context "clients connect and make one query" do
- it "only affects cl_idle, sv_idle stats" do
- connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- Thread.new { c.async_exec("SELECT pg_sleep(2.5)") }
- end
- sleep(1.1)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_waiting cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_active"]).to eq("5")
- expect(results["sv_active"]).to eq("5")
- sleep(3)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_idle"]).to eq("5")
- expect(results["sv_idle"]).to eq("5")
- connections.map(&:close)
- sleep(1)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["sv_idle"]).to eq("5")
- end
- end
- context "client connects and opens a transaction and closes connection uncleanly" do
- it "produces correct statistics" do
- connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- Thread.new do
- c.async_exec("BEGIN")
- c.async_exec("SELECT pg_sleep(0.01)")
- c.close
- end
- end
- sleep(1.1)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["sv_idle"]).to eq("5")
- end
- end
- context "client fail to checkout connection from the pool" do
- it "counts clients as idle" do
- new_configs = processes.pgcat.current_config
- new_configs["general"]["connect_timeout"] = 500
- new_configs["general"]["ban_time"] = 1
- new_configs["general"]["shutdown_timeout"] = 1
- new_configs["pools"]["sharded_db"]["users"]["0"]["pool_size"] = 1
- processes.pgcat.update_config(new_configs)
- processes.pgcat.reload_config
- threads = []
- connections = Array.new(5) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- threads << Thread.new { c.async_exec("SELECT pg_sleep(1)") rescue PG::SystemError }
- end
- sleep(2)
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_idle"]).to eq("5")
- expect(results["sv_idle"]).to eq("1")
- threads.map(&:join)
- connections.map(&:close)
- end
- end
- context "clients connects and disconnect normally" do
- let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
- it 'shows the same number of clients before and after' do
- clients_before = clients_connected_to_pool(processes: processes)
- threads = []
- connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- threads << Thread.new { c.async_exec("SELECT 1") }
- end
- clients_between = clients_connected_to_pool(processes: processes)
- expect(clients_before).not_to eq(clients_between)
- connections.each(&:close)
- clients_after = clients_connected_to_pool(processes: processes)
- expect(clients_before).to eq(clients_after)
- end
- end
- context "clients connects and disconnect abruptly" do
- let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
- it 'shows the same number of clients before and after' do
- threads = []
- connections = Array.new(2) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- threads << Thread.new { c.async_exec("SELECT 1") }
- end
- clients_before = clients_connected_to_pool(processes: processes)
- random_string = (0...8).map { (65 + rand(26)).chr }.join
- connection_string = "#{pgcat_conn_str}?application_name=#{random_string}"
- faulty_client = Process.spawn("psql -Atx #{connection_string} >/dev/null")
- sleep(1)
- # psql starts two processes, we only know the pid of the parent, this
- # ensure both are killed
- `pkill -9 -f '#{random_string}'`
- Process.wait(faulty_client)
- clients_after = clients_connected_to_pool(processes: processes)
- expect(clients_before).to eq(clients_after)
- end
- end
- context "clients overwhelm server pools" do
- let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 2) }
- it "cl_waiting is updated to show it" do
- threads = []
- connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
- end
- sleep(1.1) # Allow time for stats to update
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_idle cl_cancel_req sv_idle sv_used sv_tested sv_login maxwait].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_waiting"]).to eq("2")
- expect(results["cl_active"]).to eq("2")
- expect(results["sv_active"]).to eq("2")
- sleep(2.5) # Allow time for stats to update
- results = admin_conn.async_exec("SHOW POOLS")[0]
- %w[cl_active cl_waiting cl_cancel_req sv_active sv_used sv_tested sv_login].each do |s|
- raise StandardError, "Field #{s} was expected to be 0 but found to be #{results[s]}" if results[s] != "0"
- end
- expect(results["cl_idle"]).to eq("4")
- expect(results["sv_idle"]).to eq("2")
- threads.map(&:join)
- connections.map(&:close)
- end
- it "show correct max_wait" do
- threads = []
- connections = Array.new(4) { PG::connect("#{pgcat_conn_str}?application_name=one_query") }
- connections.each do |c|
- threads << Thread.new { c.async_exec("SELECT pg_sleep(1.5)") }
- end
- sleep(2.5) # Allow time for stats to update
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW POOLS")[0]
- expect(results["maxwait"]).to eq("1")
- expect(results["maxwait_us"].to_i).to be_within(200_000).of(500_000)
- sleep(4.5) # Allow time for stats to update
- results = admin_conn.async_exec("SHOW POOLS")[0]
- expect(results["maxwait"]).to eq("0")
- threads.map(&:join)
- connections.map(&:close)
- end
- end
- end
- describe "SHOW CLIENTS" do
- it "reports correct number and application names" do
- conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
- connections = Array.new(20) { |i| PG::connect("#{conn_str}?application_name=app#{i % 5}") }
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- sleep(1) # Wait for stats to be updated
- results = admin_conn.async_exec("SHOW CLIENTS")
- expect(results.count).to eq(21) # count admin clients
- expect(results.select { |c| c["application_name"] == "app3" || c["application_name"] == "app4" }.count).to eq(8)
- expect(results.select { |c| c["database"] == "pgcat" }.count).to eq(1)
- connections[0..5].map(&:close)
- sleep(1) # Wait for stats to be updated
- results = admin_conn.async_exec("SHOW CLIENTS")
- expect(results.count).to eq(15)
- connections[6..].map(&:close)
- sleep(1) # Wait for stats to be updated
- expect(admin_conn.async_exec("SHOW CLIENTS").count).to eq(1)
- admin_conn.close
- end
- it "reports correct number of queries and transactions" do
- conn_str = processes.pgcat.connection_string("sharded_db", "sharding_user")
- connections = Array.new(2) { |i| PG::connect("#{conn_str}?application_name=app#{i}") }
- connections.each do |c|
- c.async_exec("SELECT 1")
- c.async_exec("SELECT 2")
- c.async_exec("SELECT 3")
- c.async_exec("BEGIN")
- c.async_exec("SELECT 4")
- c.async_exec("SELECT 5")
- c.async_exec("COMMIT")
- end
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- sleep(1) # Wait for stats to be updated
- results = admin_conn.async_exec("SHOW CLIENTS")
- expect(results.count).to eq(3)
- normal_client_results = results.reject { |r| r["database"] == "pgcat" }
- expect(normal_client_results[0]["transaction_count"]).to eq("4")
- expect(normal_client_results[1]["transaction_count"]).to eq("4")
- expect(normal_client_results[0]["query_count"]).to eq("7")
- expect(normal_client_results[1]["query_count"]).to eq("7")
- admin_conn.close
- connections.map(&:close)
- end
- end
- describe "Manual Banning" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 10) }
- before do
- new_configs = processes.pgcat.current_config
- # Prevent immediate unbanning when we ban localhost
- new_configs["pools"]["sharded_db"]["shards"]["0"]["servers"][0][0] = "127.0.0.1"
- new_configs["pools"]["sharded_db"]["shards"]["0"]["servers"][1][0] = "127.0.0.1"
- processes.pgcat.update_config(new_configs)
- processes.pgcat.reload_config
- end
- describe "BAN/UNBAN and SHOW BANS" do
- it "bans/unbans hosts" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- # Returns a list of the banned addresses
- results = admin_conn.async_exec("BAN localhost 10").to_a
- expect(results.count).to eq(2)
- expect(results.map{ |r| r["host"] }.uniq).to eq(["localhost"])
- # Subsequent calls should yield no results
- results = admin_conn.async_exec("BAN localhost 10").to_a
- expect(results.count).to eq(0)
- results = admin_conn.async_exec("SHOW BANS").to_a
- expect(results.count).to eq(2)
- expect(results.map{ |r| r["host"] }.uniq).to eq(["localhost"])
- # Returns a list of the unbanned addresses
- results = admin_conn.async_exec("UNBAN localhost").to_a
- expect(results.count).to eq(2)
- expect(results.map{ |r| r["host"] }.uniq).to eq(["localhost"])
- # Subsequent calls should yield no results
- results = admin_conn.async_exec("UNBAN localhost").to_a
- expect(results.count).to eq(0)
- results = admin_conn.async_exec("SHOW BANS").to_a
- expect(results.count).to eq(0)
- end
- it "honors ban duration" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- # Returns a list of the banned addresses
- results = admin_conn.async_exec("BAN localhost 1").to_a
- expect(results.count).to eq(2)
- expect(results.map{ |r| r["host"] }.uniq).to eq(["localhost"])
- sleep(2)
- # After 2 seconds the ban should be lifted
- results = admin_conn.async_exec("SHOW BANS").to_a
- expect(results.count).to eq(0)
- end
- it "can handle bad input" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- expect { admin_conn.async_exec("BAN").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("BAN a").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("BAN a a").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("BAN a -5").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("BAN a 0").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("BAN a a a").to_a }.to raise_error(PG::SystemError)
- expect { admin_conn.async_exec("UNBAN").to_a }.to raise_error(PG::SystemError)
- end
- end
- end
- describe "SHOW users" do
- it "returns the right users" do
- admin_conn = PG::connect(processes.pgcat.admin_connection_string)
- results = admin_conn.async_exec("SHOW USERS")[0]
- admin_conn.close
- expect(results["name"]).to eq("sharding_user")
- expect(results["pool_mode"]).to eq("transaction")
- end
- end
- end
|