| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 | # frozen_string_literal: truerequire_relative 'spec_helper'describe "Random Load Balancing" do  let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 5) }  after do    processes.all_databases.map(&:reset)    processes.pgcat.shutdown  end  context "under regular circumstances" do    it "balances query volume between all instances" do      conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))      query_count = QUERY_COUNT      expected_share = query_count / processes.all_databases.count      failed_count = 0      query_count.times do        conn.async_exec("SELECT 1 + 2")      rescue        failed_count += 1      end      expect(failed_count).to eq(0)      processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|        expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)      end    end  end  context "when some replicas are down" do    it "balances query volume between working instances" do      conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))      expected_share = QUERY_COUNT / (processes.all_databases.count - 2)      failed_count = 0      processes[:replicas][0].take_down do        processes[:replicas][1].take_down do          QUERY_COUNT.times do            conn.async_exec("SELECT 1 + 2")          rescue            conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))            failed_count += 1          end        end      end      processes.all_databases.each do |instance|        queries_routed = instance.count_select_1_plus_2        if processes.replicas[0..1].include?(instance)          expect(queries_routed).to eq(0)        else          expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)        end      end    end  endenddescribe "Least Outstanding Queries Load Balancing" do  let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "loc") }  after do    processes.all_databases.map(&:reset)    processes.pgcat.shutdown  end  context "under homogeneous load" do    it "balances query volume between all instances" do      conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))      query_count = QUERY_COUNT      expected_share = query_count / processes.all_databases.count      failed_count = 0      query_count.times do        conn.async_exec("SELECT 1 + 2")      rescue        failed_count += 1      end      expect(failed_count).to eq(0)      processes.all_databases.map(&:count_select_1_plus_2).each do |instance_share|        expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)      end    end  end  context "under heterogeneous load" do    xit "balances query volume between all instances based on how busy they are" do      slow_query_count = 2      threads = Array.new(slow_query_count) do        Thread.new do          conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))          conn.async_exec("BEGIN")        end      end      conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))      query_count = QUERY_COUNT      expected_share = query_count / (processes.all_databases.count - slow_query_count)      failed_count = 0      query_count.times do        conn.async_exec("SELECT 1 + 2")      rescue        failed_count += 1      end      expect(failed_count).to eq(0)      # Under LOQ, we expect replicas running the slow pg_sleep      # to get no selects      expect(        processes.          all_databases.          map(&:count_select_1_plus_2).          count { |instance_share| instance_share == 0 }      ).to eq(slow_query_count)      # We also expect the quick queries to be spread across      # the idle servers only      processes.        all_databases.        map(&:count_select_1_plus_2).        reject { |instance_share| instance_share == 0 }.        each do |instance_share|          expect(instance_share).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)      end      threads.map(&:join)    end  end  context "when some replicas are down" do    it "balances query volume between working instances" do      conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))      expected_share = QUERY_COUNT / (processes.all_databases.count - 2)      failed_count = 0      processes[:replicas][0].take_down do        processes[:replicas][1].take_down do          QUERY_COUNT.times do            conn.async_exec("SELECT 1 + 2")          rescue            conn = PG.connect(processes.pgcat.connection_string("sharded_db", "sharding_user"))            failed_count += 1          end        end      end      expect(failed_count).to be <= 2      processes.all_databases.each do |instance|        queries_routed = instance.count_select_1_plus_2        if processes.replicas[0..1].include?(instance)          expect(queries_routed).to eq(0)        else          expect(queries_routed).to be_within(expected_share * MARGIN_OF_ERROR).of(expected_share)        end      end    end  endend
 |