123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- # frozen_string_literal: true
- require_relative 'spec_helper'
- describe "Random Load Balancing" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }
- after do
- processes.all_databases.map(&:reset)
- processes.pgcat.shutdown
- end
- context "under regular circumstances" do
- it "balances query volume between all instances" do
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- query_count = QUERY_COUNT
- expected_share = query_count / processes.all_databases.count
- failed_count = 0
- query_count.times do
- conn.async_exec("SELECT 1 + 2")
- rescue
- failed_count += 1
- end
- expect(failed_count).to eq(0)
- processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
- expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
- end
- end
- end
- context "when some replicas are down" do
- it "balances query volume between working instances" do
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
- failed_count = 0
- processes[:replicas][0].take_down do
- processes[:replicas][1].take_down do
- QUERY_COUNT.times do
- conn.async_exec("SELECT 1 + 2")
- rescue
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- failed_count += 1
- end
- end
- end
- processes.all_databases.each do |instance|
- queries_routed = instance.count_select_1_plus_2
- if processes.replicas[0..1].include?(instance)
- expect(queries_routed).to eq(0)
- else
- expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
- end
- end
- end
- end
- end
- describe "Least Outstanding Queries Load Balancing" do
- let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }
- after do
- processes.all_databases.map(&:reset)
- processes.pgcat.shutdown
- end
- context "under homogeneous load" do
- it "balances query volume between all instances" do
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- query_count = QUERY_COUNT
- expected_share = query_count / processes.all_databases.count
- failed_count = 0
- query_count.times do
- conn.async_exec("SELECT 1 + 2")
- rescue
- failed_count += 1
- end
- expect(failed_count).to eq(0)
- processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|
- expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
- end
- end
- end
- context "under heterogeneous load" do
- xit "balances query volume between all instances based on how busy they are" do
- slow_query_count = 2
- threads = Array.new(slow_query_count) do
- Thread.new do
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- conn.async_exec("BEGIN")
- end
- end
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- query_count = QUERY_COUNT
- expected_share = query_count / (processes.all_databases.count - slow_query_count)
- failed_count = 0
- query_count.times do
- conn.async_exec("SELECT 1 + 2")
- rescue
- failed_count += 1
- end
- expect(failed_count).to eq(0)
- # Under LOQ, we expect replicas running the slow pg_sleep
- # to get no selects
- expect(
- processes.
- all_databases.
- map(&:count_select_1_plus_2).
- count { |instance_share| instance_share == 0 }
- ).to eq(slow_query_count)
- # We also expect the quick queries to be spread across
- # the idle servers only
- processes.
- all_databases.
- map(&:count_select_1_plus_2).
- reject { |instance_share| instance_share == 0 }.
- each do |instance_share|
- expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
- end
- threads.map(&:join)
- end
- end
- context "when some replicas are down" do
- it "balances query volume between working instances" do
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- expected_share = QUERY_COUNT / (processes.all_databases.count - 2)
- failed_count = 0
- processes[:replicas][0].take_down do
- processes[:replicas][1].take_down do
- QUERY_COUNT.times do
- conn.async_exec("SELECT 1 + 2")
- rescue
- conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))
- failed_count += 1
- end
- end
- end
- expect(failed_count).to be <= 2
- processes.all_databases.each do |instance|
- queries_routed = instance.count_select_1_plus_2
- if processes.replicas[0..1].include?(instance)
- expect(queries_routed).to eq(0)
- else
- expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)
- end
- end
- end
- end
- end
|