pgcat_helper.rb 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. require 'json'
  2. require 'ostruct'
  3. require_relative 'pgcat_process'
  4. require_relative 'pg_instance'
  5. require_relative 'pg_socket'
  6. class ::Hash
  7. def deep_merge(second)
  8. merger = proc { |key, v1, v2| Hash === v1 && Hash === v2 ? v1.merge(v2, &merger) : v2 }
  9. self.merge(second, &merger)
  10. end
  11. end
  12. module Helpers
  13. module Pgcat
  14. def self.three_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
  15. user = {
  16. "password" => "sharding_user",
  17. "pool_size" => pool_size,
  18. "statement_timeout" => 0,
  19. "username" => "sharding_user"
  20. }
  21. pgcat = PgcatProcess.new(log_level)
  22. primary0 = PgInstance.new(5432, user["username"], user["password"], "shard0")
  23. primary1 = PgInstance.new(7432, user["username"], user["password"], "shard1")
  24. primary2 = PgInstance.new(8432, user["username"], user["password"], "shard2")
  25. pgcat_cfg = pgcat.current_config
  26. pgcat_cfg["pools"] = {
  27. "#{pool_name}" => {
  28. "default_role" => "any",
  29. "pool_mode" => pool_mode,
  30. "load_balancing_mode" => lb_mode,
  31. "primary_reads_enabled" => true,
  32. "query_parser_enabled" => true,
  33. "automatic_sharding_key" => "data.id",
  34. "sharding_function" => "pg_bigint_hash",
  35. "shards" => {
  36. "0" => { "database" => "shard0", "servers" => [["localhost", primary0.port.to_s, "primary"]] },
  37. "1" => { "database" => "shard1", "servers" => [["localhost", primary1.port.to_s, "primary"]] },
  38. "2" => { "database" => "shard2", "servers" => [["localhost", primary2.port.to_s, "primary"]] },
  39. },
  40. "users" => { "0" => user },
  41. "plugins" => {
  42. "intercept" => {
  43. "enabled" => true,
  44. "queries" => {
  45. "0" => {
  46. "query" => "select current_database() as a, current_schemas(false) as b",
  47. "schema" => [
  48. ["a", "text"],
  49. ["b", "text"],
  50. ],
  51. "result" => [
  52. ["${DATABASE}", "{public}"],
  53. ]
  54. }
  55. }
  56. }
  57. }
  58. }
  59. }
  60. pgcat.update_config(pgcat_cfg)
  61. pgcat.start
  62. pgcat.wait_until_ready
  63. OpenStruct.new.tap do |struct|
  64. struct.pgcat = pgcat
  65. struct.shards = [primary0, primary1, primary2]
  66. struct.all_databases = [primary0, primary1, primary2]
  67. end
  68. end
  69. def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="trace")
  70. user = {
  71. "password" => "sharding_user",
  72. "pool_size" => pool_size,
  73. "statement_timeout" => 0,
  74. "username" => "sharding_user"
  75. }
  76. pgcat = PgcatProcess.new(log_level)
  77. pgcat_cfg = pgcat.current_config
  78. primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
  79. # Main proxy configs
  80. pgcat_cfg["pools"] = {
  81. "#{pool_name}" => {
  82. "default_role" => "primary",
  83. "pool_mode" => pool_mode,
  84. "load_balancing_mode" => lb_mode,
  85. "primary_reads_enabled" => false,
  86. "query_parser_enabled" => false,
  87. "sharding_function" => "pg_bigint_hash",
  88. "shards" => {
  89. "0" => {
  90. "database" => "shard0",
  91. "servers" => [
  92. ["localhost", primary.port.to_s, "primary"]
  93. ]
  94. },
  95. },
  96. "users" => { "0" => user }
  97. }
  98. }
  99. pgcat_cfg["general"]["port"] = pgcat.port
  100. pgcat.update_config(pgcat_cfg)
  101. pgcat.start
  102. pgcat.wait_until_ready
  103. OpenStruct.new.tap do |struct|
  104. struct.pgcat = pgcat
  105. struct.primary = primary
  106. struct.all_databases = [primary]
  107. end
  108. end
  109. def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info")
  110. user = {
  111. "password" => "sharding_user",
  112. "pool_size" => pool_size,
  113. "statement_timeout" => 0,
  114. "username" => "sharding_user"
  115. }
  116. pgcat = PgcatProcess.new(log_level)
  117. pgcat_cfg = pgcat.current_config
  118. primary = PgInstance.new(5432, user["username"], user["password"], "shard0")
  119. replica0 = PgInstance.new(7432, user["username"], user["password"], "shard0")
  120. replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0")
  121. replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0")
  122. # Main proxy configs
  123. pgcat_cfg["pools"] = {
  124. "#{pool_name}" => {
  125. "default_role" => "any",
  126. "pool_mode" => pool_mode,
  127. "load_balancing_mode" => lb_mode,
  128. "primary_reads_enabled" => false,
  129. "query_parser_enabled" => false,
  130. "sharding_function" => "pg_bigint_hash",
  131. "shards" => {
  132. "0" => {
  133. "database" => "shard0",
  134. "servers" => [
  135. ["localhost", primary.port.to_s, "primary"],
  136. ["localhost", replica0.port.to_s, "replica"],
  137. ["localhost", replica1.port.to_s, "replica"],
  138. ["localhost", replica2.port.to_s, "replica"]
  139. ]
  140. },
  141. },
  142. "users" => { "0" => user }
  143. }
  144. }
  145. pgcat_cfg["general"]["port"] = pgcat.port
  146. pgcat.update_config(pgcat_cfg)
  147. pgcat.start
  148. pgcat.wait_until_ready
  149. OpenStruct.new.tap do |struct|
  150. struct.pgcat = pgcat
  151. struct.primary = primary
  152. struct.replicas = [replica0, replica1, replica2]
  153. struct.all_databases = [primary, replica0, replica1, replica2]
  154. end
  155. end
  156. end
  157. end