pg_socket.rb 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. require 'socket'
  2. require 'digest/md5'
  3. BACKEND_MESSAGE_CODES = {
  4. 'Z' => "ReadyForQuery",
  5. 'C' => "CommandComplete",
  6. 'T' => "RowDescription",
  7. 'D' => "DataRow",
  8. '1' => "ParseComplete",
  9. '2' => "BindComplete",
  10. 'E' => "ErrorResponse",
  11. 's' => "PortalSuspended",
  12. }
  13. class PostgresSocket
  14. def initialize(host, port)
  15. @port = port
  16. @host = host
  17. @socket = TCPSocket.new @host, @port
  18. @parameters = {}
  19. @verbose = true
  20. end
  21. def send_md5_password_message(username, password, salt)
  22. m = Digest::MD5.hexdigest(password + username)
  23. m = Digest::MD5.hexdigest(m + salt.map(&:chr).join(""))
  24. m = 'md5' + m
  25. bytes = (m.split("").map(&:ord) + [0]).flatten
  26. message_size = bytes.count + 4
  27. message = []
  28. message << 'p'.ord
  29. message << [message_size].pack('l>').unpack('CCCC') # 4
  30. message << bytes
  31. message.flatten!
  32. @socket.write(message.pack('C*'))
  33. end
  34. def send_startup_message(username, database, password)
  35. message = []
  36. message << [196608].pack('l>').unpack('CCCC') # 4
  37. message << "user".split('').map(&:ord) # 4, 8
  38. message << 0 # 1, 9
  39. message << username.split('').map(&:ord) # 2, 11
  40. message << 0 # 1, 12
  41. message << "database".split('').map(&:ord) # 8, 20
  42. message << 0 # 1, 21
  43. message << database.split('').map(&:ord) # 2, 23
  44. message << 0 # 1, 24
  45. message << 0 # 1, 25
  46. message.flatten!
  47. total_message_size = message.size + 4
  48. message_len = [total_message_size].pack('l>').unpack('CCCC')
  49. @socket.write([message_len + message].flatten.pack('C*'))
  50. sleep 0.1
  51. read_startup_response(username, password)
  52. end
  53. def read_startup_response(username, password)
  54. message_code, message_len = @socket.recv(5).unpack("al>")
  55. while message_code == 'R'
  56. auth_code = @socket.recv(4).unpack('l>').pop
  57. case auth_code
  58. when 5 # md5
  59. salt = @socket.recv(4).unpack('CCCC')
  60. send_md5_password_message(username, password, salt)
  61. message_code, message_len = @socket.recv(5).unpack("al>")
  62. when 0 # trust
  63. break
  64. end
  65. end
  66. loop do
  67. message_code, message_len = @socket.recv(5).unpack("al>")
  68. if message_code == 'Z'
  69. @socket.recv(1).unpack("a") # most likely I
  70. break # We are good to go
  71. end
  72. if message_code == 'S'
  73. actual_message = @socket.recv(message_len - 4).unpack("C*")
  74. k,v = actual_message.pack('U*').split(/\x00/)
  75. @parameters[k] = v
  76. end
  77. if message_code == 'K'
  78. process_id, secret_key = @socket.recv(message_len - 4).unpack("l>l>")
  79. @parameters["process_id"] = process_id
  80. @parameters["secret_key"] = secret_key
  81. end
  82. end
  83. return @parameters
  84. end
  85. def cancel_query
  86. socket = TCPSocket.new @host, @port
  87. process_key = @parameters["process_id"]
  88. secret_key = @parameters["secret_key"]
  89. message = []
  90. message << [16].pack('l>').unpack('CCCC') # 4
  91. message << [80877102].pack('l>').unpack('CCCC') # 4
  92. message << [process_key.to_i].pack('l>').unpack('CCCC') # 4
  93. message << [secret_key.to_i].pack('l>').unpack('CCCC') # 4
  94. message.flatten!
  95. socket.write(message.flatten.pack('C*'))
  96. socket.close
  97. log "[F] Sent CancelRequest message"
  98. end
  99. def send_query_message(query)
  100. query_size = query.length
  101. message_size = 1 + 4 + query_size
  102. message = []
  103. message << "Q".ord
  104. message << [message_size].pack('l>').unpack('CCCC') # 4
  105. message << query.split('').map(&:ord) # 2, 11
  106. message << 0 # 1, 12
  107. message.flatten!
  108. @socket.write(message.flatten.pack('C*'))
  109. log "[F] Sent Q message (#{query})"
  110. end
  111. def send_parse_message(query)
  112. query_size = query.length
  113. message_size = 2 + 2 + 4 + query_size
  114. message = []
  115. message << "P".ord
  116. message << [message_size].pack('l>').unpack('CCCC') # 4
  117. message << 0 # unnamed statement
  118. message << query.split('').map(&:ord) # 2, 11
  119. message << 0 # 1, 12
  120. message << [0, 0]
  121. message.flatten!
  122. @socket.write(message.flatten.pack('C*'))
  123. log "[F] Sent P message (#{query})"
  124. end
  125. def send_bind_message
  126. message = []
  127. message << "B".ord
  128. message << [12].pack('l>').unpack('CCCC') # 4
  129. message << 0 # unnamed statement
  130. message << 0 # unnamed statement
  131. message << [0, 0] # 2
  132. message << [0, 0] # 2
  133. message << [0, 0] # 2
  134. message.flatten!
  135. @socket.write(message.flatten.pack('C*'))
  136. log "[F] Sent B message"
  137. end
  138. def send_describe_message(mode)
  139. message = []
  140. message << "D".ord
  141. message << [6].pack('l>').unpack('CCCC') # 4
  142. message << mode.ord
  143. message << 0 # unnamed statement
  144. message.flatten!
  145. @socket.write(message.flatten.pack('C*'))
  146. log "[F] Sent D message"
  147. end
  148. def send_execute_message(limit=0)
  149. message = []
  150. message << "E".ord
  151. message << [9].pack('l>').unpack('CCCC') # 4
  152. message << 0 # unnamed statement
  153. message << [limit].pack('l>').unpack('CCCC') # 4
  154. message.flatten!
  155. @socket.write(message.flatten.pack('C*'))
  156. log "[F] Sent E message"
  157. end
  158. def send_sync_message
  159. message = []
  160. message << "S".ord
  161. message << [4].pack('l>').unpack('CCCC') # 4
  162. message.flatten!
  163. @socket.write(message.flatten.pack('C*'))
  164. log "[F] Sent S message"
  165. end
  166. def send_copydone_message
  167. message = []
  168. message << "c".ord
  169. message << [4].pack('l>').unpack('CCCC') # 4
  170. message.flatten!
  171. @socket.write(message.flatten.pack('C*'))
  172. log "[F] Sent c message"
  173. end
  174. def send_copyfail_message
  175. message = []
  176. message << "f".ord
  177. message << [5].pack('l>').unpack('CCCC') # 4
  178. message << 0
  179. message.flatten!
  180. @socket.write(message.flatten.pack('C*'))
  181. log "[F] Sent f message"
  182. end
  183. def send_flush_message
  184. message = []
  185. message << "H".ord
  186. message << [4].pack('l>').unpack('CCCC') # 4
  187. message.flatten!
  188. @socket.write(message.flatten.pack('C*'))
  189. log "[F] Sent H message"
  190. end
  191. def read_from_server()
  192. output_messages = []
  193. retry_count = 0
  194. message_code = nil
  195. message_len = 0
  196. loop do
  197. begin
  198. message_code, message_len = @socket.recv_nonblock(5).unpack("al>")
  199. rescue IO::WaitReadable
  200. return output_messages if retry_count > 50
  201. retry_count += 1
  202. sleep(0.01)
  203. next
  204. end
  205. message = {
  206. code: message_code,
  207. len: message_len,
  208. bytes: []
  209. }
  210. log "[B] #{BACKEND_MESSAGE_CODES[message_code] || ('UnknownMessage(' + message_code + ')')}"
  211. actual_message_length = message_len - 4
  212. if actual_message_length > 0
  213. message[:bytes] = @socket.recv(message_len - 4).unpack("C*")
  214. log "\t#{message[:bytes].join(",")}"
  215. log "\t#{message[:bytes].map(&:chr).join(" ")}"
  216. end
  217. output_messages << message
  218. return output_messages if message_code == 'Z'
  219. end
  220. end
  221. def log(msg)
  222. return unless @verbose
  223. puts msg
  224. end
  225. def close
  226. @socket.close
  227. end
  228. end