auth_query_helper.rb 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. module Helpers
  2. module AuthQuery
  3. def self.single_shard_auth_query(
  4. pg_user:,
  5. config_user:,
  6. pool_name:,
  7. extra_conf: {},
  8. log_level: 'debug',
  9. wait_until_ready: true
  10. )
  11. user = {
  12. "pool_size" => 10,
  13. "statement_timeout" => 0,
  14. }
  15. pgcat = PgcatProcess.new(log_level)
  16. pgcat_cfg = pgcat.current_config.deep_merge(extra_conf)
  17. primary = PgInstance.new(5432, pg_user["username"], pg_user["password"], "shard0")
  18. replica = PgInstance.new(10432, pg_user["username"], pg_user["password"], "shard0")
  19. # Main proxy configs
  20. pgcat_cfg["pools"] = {
  21. "#{pool_name}" => {
  22. "default_role" => "any",
  23. "pool_mode" => "transaction",
  24. "load_balancing_mode" => "random",
  25. "primary_reads_enabled" => false,
  26. "query_parser_enabled" => false,
  27. "sharding_function" => "pg_bigint_hash",
  28. "shards" => {
  29. "0" => {
  30. "database" => "shard0",
  31. "servers" => [
  32. ["localhost", primary.port.to_s, "primary"],
  33. ["localhost", replica.port.to_s, "replica"],
  34. ]
  35. },
  36. },
  37. "users" => { "0" => user.merge(config_user) }
  38. }
  39. }
  40. pgcat_cfg["general"]["port"] = pgcat.port
  41. pgcat.update_config(pgcat_cfg)
  42. pgcat.start
  43. pgcat.wait_until_ready(
  44. pgcat.connection_string(
  45. "sharded_db",
  46. pg_user['username'],
  47. pg_user['password']
  48. )
  49. ) if wait_until_ready
  50. OpenStruct.new.tap do |struct|
  51. struct.pgcat = pgcat
  52. struct.primary = primary
  53. struct.replicas = [replica]
  54. struct.all_databases = [primary]
  55. end
  56. end
  57. def self.two_pools_auth_query(
  58. pg_user:,
  59. config_user:,
  60. pool_names:,
  61. extra_conf: {},
  62. log_level: 'debug'
  63. )
  64. user = {
  65. "pool_size" => 10,
  66. "statement_timeout" => 0,
  67. }
  68. pgcat = PgcatProcess.new(log_level)
  69. pgcat_cfg = pgcat.current_config
  70. primary = PgInstance.new(5432, pg_user["username"], pg_user["password"], "shard0")
  71. replica = PgInstance.new(10432, pg_user["username"], pg_user["password"], "shard0")
  72. pool_template = Proc.new do |database|
  73. {
  74. "default_role" => "any",
  75. "pool_mode" => "transaction",
  76. "load_balancing_mode" => "random",
  77. "primary_reads_enabled" => false,
  78. "query_parser_enabled" => false,
  79. "sharding_function" => "pg_bigint_hash",
  80. "shards" => {
  81. "0" => {
  82. "database" => database,
  83. "servers" => [
  84. ["localhost", primary.port.to_s, "primary"],
  85. ["localhost", replica.port.to_s, "replica"],
  86. ]
  87. },
  88. },
  89. "users" => { "0" => user.merge(config_user) }
  90. }
  91. end
  92. # Main proxy configs
  93. pgcat_cfg["pools"] = {
  94. "#{pool_names[0]}" => pool_template.call("shard0"),
  95. "#{pool_names[1]}" => pool_template.call("shard1")
  96. }
  97. pgcat_cfg["general"]["port"] = pgcat.port
  98. pgcat.update_config(pgcat_cfg.deep_merge(extra_conf))
  99. pgcat.start
  100. pgcat.wait_until_ready(pgcat.connection_string("sharded_db0", pg_user['username'], pg_user['password']))
  101. OpenStruct.new.tap do |struct|
  102. struct.pgcat = pgcat
  103. struct.primary = primary
  104. struct.replicas = [replica]
  105. struct.all_databases = [primary]
  106. end
  107. end
  108. def self.create_query_auth_function(user)
  109. return <<-SQL
  110. CREATE OR REPLACE FUNCTION public.user_lookup(in i_username text, out uname text, out phash text)
  111. RETURNS record AS $$
  112. BEGIN
  113. SELECT usename, passwd FROM pg_catalog.pg_shadow
  114. WHERE usename = i_username INTO uname, phash;
  115. RETURN;
  116. END;
  117. $$ LANGUAGE plpgsql SECURITY DEFINER;
  118. GRANT EXECUTE ON FUNCTION public.user_lookup(text) TO #{user};
  119. SQL
  120. end
  121. def self.exec_in_instances(query:, instance_ports: [ 5432, 10432 ], database: 'postgres', user: 'postgres', password: 'postgres')
  122. instance_ports.each do |port|
  123. c = PG.connect("postgres://#{user}:#{password}@localhost:#{port}/#{database}")
  124. c.exec(query)
  125. c.close
  126. end
  127. end
  128. def self.set_up_auth_query_for_user(user:, password:, instance_ports: [ 5432, 10432 ], database: 'shard0' )
  129. instance_ports.each do |port|
  130. connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
  131. connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
  132. connection.exec("DROP ROLE #{user}") rescue PG::UndefinedObject
  133. connection.exec("CREATE ROLE #{user} ENCRYPTED PASSWORD '#{password}' LOGIN;")
  134. connection.exec(self.create_query_auth_function(user))
  135. connection.close
  136. end
  137. end
  138. def self.tear_down_auth_query_for_user(user:, password:, instance_ports: [ 5432, 10432 ], database: 'shard0' )
  139. instance_ports.each do |port|
  140. connection = PG.connect("postgres://postgres:postgres@localhost:#{port}/#{database}")
  141. connection.exec(self.drop_query_auth_function(user)) rescue PG::UndefinedFunction
  142. connection.exec("DROP ROLE #{user}")
  143. connection.close
  144. end
  145. end
  146. def self.drop_query_auth_function(user)
  147. return <<-SQL
  148. REVOKE ALL ON FUNCTION public.user_lookup(text) FROM public, #{user};
  149. DROP FUNCTION public.user_lookup(in i_username text, out uname text, out phash text);
  150. SQL
  151. end
  152. end
  153. end