load_balancing_spec.rb 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # frozen_string_literal: true
  2. require_relative 'spec_helper'
  3. describe "Random Load Balancing" do
  4. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
  5. after do
  6. processes.all_databases.map(&:reset)
  7. processes.pgcat.shutdown
  8. end
  9. context "under regular circumstances" do
  10. it "balances query volume between all instances" do
  11. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  12. query_count = QUERY_COUNT
  13. expected_share = query_count / processes.all_databases.count
  14. failed_count = 0
  15. query_count.times do
  16. conn.async_exec("SELECT 1 + 2")
  17. rescue
  18. failed_count += 1
  19. end
  20. expect(failed_count).to eq(0)
  21. processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
  22. expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
  23. end
  24. end
  25. end
  26. context "when some replicas are down" do
  27. it "balances query volume between working instances" do
  28. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  29. expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
  30. failed_count = 0
  31. processes[:replicas][0].take_down do
  32. processes[:replicas][1].take_down do
  33. QUERY_COUNT.times do
  34. conn.async_exec("SELECT 1 + 2")
  35. rescue
  36. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  37. failed_count += 1
  38. end
  39. end
  40. end
  41. processes.all_databases.each do |instance|
  42. queries_routed = instance.count_select_1_plus_2
  43. if processes.replicas[0..1].include?(instance)
  44. expect(queries_routed).to eq(0)
  45. else
  46. expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
  47. end
  48. end
  49. end
  50. end
  51. end
  52. describe "Least Outstanding Queries Load Balancing" do
  53. let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }
  54. after do
  55. processes.all_databases.map(&:reset)
  56. processes.pgcat.shutdown
  57. end
  58. context "under homogeneous load" do
  59. it "balances query volume between all instances" do
  60. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  61. query_count = QUERY_COUNT
  62. expected_share = query_count / processes.all_databases.count
  63. failed_count = 0
  64. query_count.times do
  65. conn.async_exec("SELECT 1 + 2")
  66. rescue
  67. failed_count += 1
  68. end
  69. expect(failed_count).to eq(0)
  70. processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
  71. expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
  72. end
  73. end
  74. end
  75. context "under heterogeneous load" do
  76. xit "balances query volume between all instances based on how busy they are" do
  77. slow_query_count = 2
  78. threads = Array.new(slow_query_count) do
  79. Thread.new do
  80. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  81. conn.async_exec("BEGIN")
  82. end
  83. end
  84. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  85. query_count = QUERY_COUNT
  86. expected_share = query_count / (processes.all_databases.count - slow_query_count)
  87. failed_count = 0
  88. query_count.times do
  89. conn.async_exec("SELECT 1 + 2")
  90. rescue
  91. failed_count += 1
  92. end
  93. expect(failed_count).to eq(0)
  94. # Under LOQ, we expect replicas running the slow pg_sleep
  95. # to get no selects
  96. expect(
  97. processes.
  98. all_databases.
  99. map(&:count_select_1_plus_2).
  100. count { |instance_share| instance_share == 0 }
  101. ).to eq(slow_query_count)
  102. # We also expect the quick queries to be spread across
  103. # the idle servers only
  104. processes.
  105. all_databases.
  106. map(&:count_select_1_plus_2).
  107. reject { |instance_share| instance_share == 0 }.
  108. each do |instance_share|
  109. expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
  110. end
  111. threads.map(&:join)
  112. end
  113. end
  114. context "when some replicas are down" do
  115. it "balances query volume between working instances" do
  116. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  117. expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
  118. failed_count = 0
  119. processes[:replicas][0].take_down do
  120. processes[:replicas][1].take_down do
  121. QUERY_COUNT.times do
  122. conn.async_exec("SELECT 1 + 2")
  123. rescue
  124. conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
  125. failed_count += 1
  126. end
  127. end
  128. end
  129. expect(failed_count).to be <= 2
  130. processes.all_databases.each do |instance|
  131. queries_routed = instance.count_select_1_plus_2
  132. if processes.replicas[0..1].include?(instance)
  133. expect(queries_routed).to eq(0)
  134. else
  135. expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
  136. end
  137. end
  138. end
  139. end
  140. end