pg_instance.rb 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. require 'pg'
  2. require 'toxiproxy'
  3. class PgInstance
  4. attr_reader :port
  5. attr_reader :username
  6. attr_reader :password
  7. attr_reader :database_name
  8. def initialize(port, username, password, database_name)
  9. @original_port = port
  10. @toxiproxy_port = 10000 + port.to_i
  11. @port = @toxiproxy_port
  12. @username = username
  13. @password = password
  14. @database_name = database_name
  15. @toxiproxy_name = "database_#{@original_port}"
  16. Toxiproxy.populate([{
  17. name: @toxiproxy_name,
  18. listen: "0.0.0.0:#{@toxiproxy_port}",
  19. upstream: "localhost:#{@original_port}",
  20. }])
  21. # Toxiproxy server will outlive our PgInstance objects
  22. # so we want to destroy our proxies before exiting
  23. # Ruby finalizer is ideal for doing this
  24. ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
  25. end
  26. def with_connection
  27. conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
  28. yield conn
  29. ensure
  30. conn&.close
  31. end
  32. def reset
  33. reset_toxics
  34. reset_stats
  35. drop_connections
  36. sleep 0.1
  37. end
  38. def toxiproxy
  39. Toxiproxy[@toxiproxy_name]
  40. end
  41. def take_down
  42. if block_given?
  43. Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
  44. else
  45. Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
  46. end
  47. end
  48. def add_latency(latency)
  49. if block_given?
  50. Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
  51. else
  52. Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
  53. end
  54. end
  55. def delete_proxy
  56. Toxiproxy[@toxiproxy_name].delete
  57. end
  58. def reset_toxics
  59. Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
  60. sleep 0.1
  61. end
  62. def reset_stats
  63. with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
  64. end
  65. def drop_connections
  66. username = with_connection { |c| c.async_exec("SELECT current_user")[0]["current_user"] }
  67. with_connection { |c| c.async_exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename='#{username}'") }
  68. end
  69. def count_connections
  70. with_connection { |c| c.async_exec("SELECT COUNT(*) as count FROM pg_stat_activity")[0]["count"].to_i }
  71. end
  72. def count_query(query)
  73. with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
  74. end
  75. def count_select_1_plus_2
  76. with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
  77. end
  78. end