sharding.rs 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. use serde_derive::{Deserialize, Serialize};
  2. /// Implements various sharding functions.
  3. use sha1::{Digest, Sha1};
  4. /// See: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/include/catalog/partition.h#L20>.
  5. const PARTITION_HASH_SEED: u64 = 0x7A5B22367996DCFD;
  6. /// The sharding functions we support.
  7. #[derive(Debug, PartialEq, Copy, Clone, Serialize, Deserialize, Hash, std::cmp::Eq)]
  8. pub enum ShardingFunction {
  9. #[serde(alias = "pg_bigint_hash", alias = "PgBigintHash")]
  10. PgBigintHash,
  11. #[serde(alias = "sha1", alias = "Sha1")]
  12. Sha1,
  13. }
  14. impl ToString for ShardingFunction {
  15. fn to_string(&self) -> String {
  16. match *self {
  17. ShardingFunction::PgBigintHash => "pg_bigint_hash".to_string(),
  18. ShardingFunction::Sha1 => "sha1".to_string(),
  19. }
  20. }
  21. }
  22. /// The sharder.
  23. pub struct Sharder {
  24. /// Number of shards in the cluster.
  25. shards: usize,
  26. /// The sharding function in use.
  27. sharding_function: ShardingFunction,
  28. }
  29. impl Sharder {
  30. /// Create new instance of the sharder.
  31. pub fn new(shards: usize, sharding_function: ShardingFunction) -> Sharder {
  32. Sharder {
  33. shards,
  34. sharding_function,
  35. }
  36. }
  37. /// Compute the shard given sharding key.
  38. pub fn shard(&self, key: i64) -> usize {
  39. match self.sharding_function {
  40. ShardingFunction::PgBigintHash => self.pg_bigint_hash(key),
  41. ShardingFunction::Sha1 => self.sha1(key),
  42. }
  43. }
  44. /// Hash function used by Postgres to determine which partition
  45. /// to put the row in when using HASH(column) partitioning.
  46. /// Source: <https://github.com/postgres/postgres/blob/27b77ecf9f4d5be211900eda54d8155ada50d696/src/common/hashfn.c#L631>.
  47. /// Supports only 1 bigint at the moment, but we can add more later.
  48. fn pg_bigint_hash(&self, key: i64) -> usize {
  49. let mut lohalf = key as u32;
  50. let hihalf = (key >> 32) as u32;
  51. lohalf ^= if key >= 0 { hihalf } else { !hihalf };
  52. Self::combine(0, Self::pg_u32_hash(lohalf)) as usize % self.shards
  53. }
  54. /// Example of a hashing function based on SHA1.
  55. fn sha1(&self, key: i64) -> usize {
  56. let mut hasher = Sha1::new();
  57. hasher.update(&key.to_string().as_bytes());
  58. let result = hasher.finalize();
  59. // Convert the SHA1 hash into hex so we can parse it as a large integer.
  60. let hex = format!("{:x}", result);
  61. // Parse the last 8 bytes as an integer (8 bytes = bigint).
  62. let key = i64::from_str_radix(&hex[hex.len() - 8..], 16).unwrap() as usize;
  63. key % self.shards
  64. }
  65. #[inline]
  66. fn rot(x: u32, k: u32) -> u32 {
  67. (x << k) | (x >> (32 - k))
  68. }
  69. #[inline]
  70. fn mix(mut a: u32, mut b: u32, mut c: u32) -> (u32, u32, u32) {
  71. a = a.wrapping_sub(c);
  72. a ^= Self::rot(c, 4);
  73. c = c.wrapping_add(b);
  74. b = b.wrapping_sub(a);
  75. b ^= Self::rot(a, 6);
  76. a = a.wrapping_add(c);
  77. c = c.wrapping_sub(b);
  78. c ^= Self::rot(b, 8);
  79. b = b.wrapping_add(a);
  80. a = a.wrapping_sub(c);
  81. a ^= Self::rot(c, 16);
  82. c = c.wrapping_add(b);
  83. b = b.wrapping_sub(a);
  84. b ^= Self::rot(a, 19);
  85. a = a.wrapping_add(c);
  86. c = c.wrapping_sub(b);
  87. c ^= Self::rot(b, 4);
  88. b = b.wrapping_add(a);
  89. (a, b, c)
  90. }
  91. #[inline]
  92. fn _final(mut a: u32, mut b: u32, mut c: u32) -> (u32, u32, u32) {
  93. c ^= b;
  94. c = c.wrapping_sub(Self::rot(b, 14));
  95. a ^= c;
  96. a = a.wrapping_sub(Self::rot(c, 11));
  97. b ^= a;
  98. b = b.wrapping_sub(Self::rot(a, 25));
  99. c ^= b;
  100. c = c.wrapping_sub(Self::rot(b, 16));
  101. a ^= c;
  102. a = a.wrapping_sub(Self::rot(c, 4));
  103. b ^= a;
  104. b = b.wrapping_sub(Self::rot(a, 14));
  105. c ^= b;
  106. c = c.wrapping_sub(Self::rot(b, 24));
  107. (a, b, c)
  108. }
  109. #[inline]
  110. fn combine(mut a: u64, b: u64) -> u64 {
  111. a ^= b
  112. .wrapping_add(0x49a0f4dd15e5a8e3_u64)
  113. .wrapping_add(a << 54)
  114. .wrapping_add(a >> 7);
  115. a
  116. }
  117. #[inline]
  118. fn pg_u32_hash(k: u32) -> u64 {
  119. let mut a: u32 = 0x9e3779b9_u32 + std::mem::size_of::<u32>() as u32 + 3923095_u32;
  120. let mut b = a;
  121. let c = a;
  122. a = a.wrapping_add((PARTITION_HASH_SEED >> 32) as u32);
  123. b = b.wrapping_add(PARTITION_HASH_SEED as u32);
  124. let (mut a, b, c) = Self::mix(a, b, c);
  125. a = a.wrapping_add(k);
  126. let (_a, b, c) = Self::_final(a, b, c);
  127. ((b as u64) << 32) | (c as u64)
  128. }
  129. }
  130. #[cfg(test)]
  131. mod test {
  132. use super::*;
  133. // See tests/sharding/partition_hash_test_setup.sql
  134. // The output of those SELECT statements will match this test,
  135. // confirming that we implemented Postgres BIGINT hashing correctly.
  136. #[test]
  137. fn test_pg_bigint_hash() {
  138. let sharder = Sharder::new(5, ShardingFunction::PgBigintHash);
  139. let shard_0 = vec![1, 4, 5, 14, 19, 39, 40, 46, 47, 53];
  140. for v in shard_0 {
  141. assert_eq!(sharder.shard(v), 0);
  142. }
  143. let shard_1 = vec![2, 3, 11, 17, 21, 23, 30, 49, 51, 54];
  144. for v in shard_1 {
  145. assert_eq!(sharder.shard(v), 1);
  146. }
  147. let shard_2 = vec![6, 7, 15, 16, 18, 20, 25, 28, 34, 35];
  148. for v in shard_2 {
  149. assert_eq!(sharder.shard(v), 2);
  150. }
  151. let shard_3 = vec![8, 12, 13, 22, 29, 31, 33, 36, 41, 43];
  152. for v in shard_3 {
  153. assert_eq!(sharder.shard(v), 3);
  154. }
  155. let shard_4 = vec![9, 10, 24, 26, 27, 32, 37, 38, 42, 45];
  156. for v in shard_4 {
  157. assert_eq!(sharder.shard(v), 4);
  158. }
  159. }
  160. #[test]
  161. fn test_sha1_hash() {
  162. let sharder = Sharder::new(12, ShardingFunction::Sha1);
  163. let ids = vec![
  164. 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
  165. ];
  166. let shards = vec![
  167. 4, 7, 8, 3, 6, 0, 0, 10, 3, 11, 1, 7, 4, 4, 11, 2, 5, 0, 8, 3,
  168. ];
  169. for (i, id) in ids.iter().enumerate() {
  170. assert_eq!(sharder.shard(*id), shards[i]);
  171. }
  172. }
  173. }