mirrors_spec.rb 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # frozen_string_literal: true
  2. require 'uri'
  3. require_relative 'spec_helper'
  4. describe "Query Mirroing" do
  5. let(:processes) { Helpers::Pgcat.single_instance_setup("sharded_db", 10) }
  6. let(:mirror_pg) { PgInstance.new(8432, "sharding_user", "sharding_user", "shard2")}
  7. let(:pgcat_conn_str) { processes.pgcat.connection_string("sharded_db", "sharding_user") }
  8. let(:mirror_host) { "localhost" }
  9. before do
  10. new_configs = processes.pgcat.current_config
  11. new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
  12. [mirror_host, mirror_pg.port.to_s, "0"],
  13. [mirror_host, mirror_pg.port.to_s, "0"],
  14. [mirror_host, mirror_pg.port.to_s, "0"],
  15. ]
  16. processes.pgcat.update_config(new_configs)
  17. processes.pgcat.reload_config
  18. end
  19. after do
  20. processes.all_databases.map(&:reset)
  21. mirror_pg.reset
  22. processes.pgcat.shutdown
  23. end
  24. xit "can mirror a query" do
  25. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  26. runs = 15
  27. runs.times { conn.async_exec("SELECT 1 + 2") }
  28. sleep 0.5
  29. expect(processes.all_databases.first.count_select_1_plus_2).to eq(runs)
  30. expect(mirror_pg.count_select_1_plus_2).to eq(runs * 3)
  31. end
  32. context "when main server connection is closed" do
  33. it "closes the mirror connection" do
  34. baseline_count = processes.all_databases.first.count_connections
  35. 5.times do |i|
  36. # Force pool cycling to detect zombie mirror connections
  37. new_configs = processes.pgcat.current_config
  38. new_configs["pools"]["sharded_db"]["idle_timeout"] = 5000 + i
  39. new_configs["pools"]["sharded_db"]["shards"]["0"]["mirrors"] = [
  40. [mirror_host, mirror_pg.port.to_s, "0"],
  41. [mirror_host, mirror_pg.port.to_s, "0"],
  42. [mirror_host, mirror_pg.port.to_s, "0"],
  43. ]
  44. processes.pgcat.update_config(new_configs)
  45. processes.pgcat.reload_config
  46. end
  47. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  48. conn.async_exec("SELECT 1 + 2")
  49. sleep 0.5
  50. # Expect same number of connection even after pool cycling
  51. expect(processes.all_databases.first.count_connections).to be < baseline_count + 2
  52. end
  53. end
  54. xcontext "when mirror server goes down temporarily" do
  55. it "continues to transmit queries after recovery" do
  56. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  57. mirror_pg.take_down do
  58. conn.async_exec("SELECT 1 + 2")
  59. sleep 0.1
  60. end
  61. 10.times { conn.async_exec("SELECT 1 + 2") }
  62. sleep 1
  63. expect(mirror_pg.count_select_1_plus_2).to be >= 2
  64. end
  65. end
  66. context "when a mirror is down" do
  67. let(:mirror_host) { "badhost" }
  68. it "does not fail to send the main query" do
  69. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  70. # No Errors here
  71. conn.async_exec("SELECT 1 + 2")
  72. expect(processes.all_databases.first.count_select_1_plus_2).to eq(1)
  73. end
  74. it "does not fail to send the main query (even after thousands of mirror attempts)" do
  75. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  76. # No Errors here
  77. 1000.times { conn.async_exec("SELECT 1 + 2") }
  78. expect(processes.all_databases.first.count_select_1_plus_2).to eq(1000)
  79. end
  80. end
  81. end