12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 |
- require 'pg'
- require 'toxiproxy'
- class PgInstance
- attr_reader :port
- attr_reader :username
- attr_reader :password
- attr_reader :database_name
- def initialize(port, username, password, database_name)
- @original_port = port
- @toxiproxy_port = 10000 + port.to_i
- @port = @toxiproxy_port
- @username = username
- @password = password
- @database_name = database_name
- @toxiproxy_name = "database_#{@original_port}"
- Toxiproxy.populate([{
- name: @toxiproxy_name,
- listen: "0.0.0.0:#{@toxiproxy_port}",
- upstream: "localhost:#{@original_port}",
- }])
- # Toxiproxy server will outlive our PgInstance objects
- # so we want to destroy our proxies before exiting
- # Ruby finalizer is ideal for doing this
- ObjectSpace.define_finalizer(@toxiproxy_name, proc { Toxiproxy[@toxiproxy_name].destroy })
- end
- def with_connection
- conn = PG.connect("postgres://#{@username}:#{@password}@localhost:#{port}/#{database_name}")
- yield conn
- ensure
- conn&.close
- end
- def reset
- reset_toxics
- reset_stats
- drop_connections
- sleep 0.1
- end
- def toxiproxy
- Toxiproxy[@toxiproxy_name]
- end
- def take_down
- if block_given?
- Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).apply { yield }
- else
- Toxiproxy[@toxiproxy_name].toxic(:limit_data, bytes: 5).toxics.each(&:save)
- end
- end
- def add_latency(latency)
- if block_given?
- Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).apply { yield }
- else
- Toxiproxy[@toxiproxy_name].toxic(:latency, latency: latency).toxics.each(&:save)
- end
- end
- def delete_proxy
- Toxiproxy[@toxiproxy_name].delete
- end
- def reset_toxics
- Toxiproxy[@toxiproxy_name].toxics.each(&:destroy)
- sleep 0.1
- end
- def reset_stats
- with_connection { |c| c.async_exec("SELECT pg_stat_statements_reset()") }
- end
- def drop_connections
- username = with_connection { |c| c.async_exec("SELECT current_user")[0]["current_user"] }
- with_connection { |c| c.async_exec("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid <> pg_backend_pid() AND usename='#{username}'") }
- end
- def count_connections
- with_connection { |c| c.async_exec("SELECT COUNT(*) as count FROM pg_stat_activity")[0]["count"].to_i }
- end
- def count_query(query)
- with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = '#{query}'")[0]["sum"].to_i }
- end
- def count_select_1_plus_2
- with_connection { |c| c.async_exec("SELECT SUM(calls) FROM pg_stat_statements WHERE query = 'SELECT $1 + $2'")[0]["sum"].to_i }
- end
- end
|