relocation.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469
  1. from __future__ import annotations
  2. import logging
  3. from io import BytesIO
  4. from string import Template
  5. from typing import Optional
  6. from cryptography.fernet import Fernet
  7. from sentry.backup.dependencies import NormalizedModelName, get_model
  8. from sentry.backup.exports import export_in_config_scope, export_in_user_scope
  9. from sentry.backup.helpers import (
  10. DEFAULT_CRYPTO_KEY_VERSION,
  11. decrypt_data_encryption_key_using_gcp_kms,
  12. get_public_key_using_gcp_kms,
  13. unwrap_encrypted_export_tarball,
  14. )
  15. from sentry.filestore.gcs import GoogleCloudStorage
  16. from sentry.models.files.file import File
  17. from sentry.models.files.utils import get_storage
  18. from sentry.models.organization import Organization
  19. from sentry.models.relocation import Relocation, RelocationFile
  20. from sentry.models.user import User
  21. from sentry.silo import SiloMode
  22. from sentry.tasks.base import instrumented_task
  23. from sentry.utils import json
  24. from sentry.utils.relocation import (
  25. RELOCATION_BLOB_SIZE,
  26. RELOCATION_FILE_TYPE,
  27. OrderedTask,
  28. fail_relocation,
  29. retry_task_or_fail_relocation,
  30. start_relocation_task,
  31. )
  32. logger = logging.getLogger(__name__)
  33. # Time limits for various steps in the process.
  34. RETRY_BACKOFF = 60 # So the 1st retry is after ~1 min, 2nd after ~2 min, 3rd after ~4 min.
  35. UPLOADING_TIME_LIMIT = 60 # This should be quick - we're just pinging the DB, then GCS.
  36. PREPROCESSING_TIME_LIMIT = 60 * 5 # 5 minutes is plenty for all preprocessing task attempts.
  37. # All pre and post processing tasks have the same number of retries.
  38. MAX_FAST_TASK_RETRIES = 2
  39. MAX_FAST_TASK_ATTEMPTS = MAX_FAST_TASK_RETRIES + 1
  40. # Some reasonable limits on the amount of data we import - we can adjust these as needed.
  41. MAX_ORGS_PER_RELOCATION = 20
  42. MAX_USERS_PER_RELOCATION = 200
  43. RELOCATION_FILES_TO_BE_VALIDATED = [
  44. RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA,
  45. RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA,
  46. RelocationFile.Kind.RAW_USER_DATA,
  47. ]
  48. # Various error strings that we want to surface to users, grouped by step.
  49. ERR_UPLOADING_FAILED = "Internal error during file upload."
  50. ERR_PREPROCESSING_DECRYPTION = """Could not decrypt the imported JSON - are you sure you used the
  51. correct public key?"""
  52. ERR_PREPROCESSING_INTERNAL = "Internal error during preprocessing."
  53. ERR_PREPROCESSING_INVALID_JSON = "Invalid input JSON."
  54. ERR_PREPROCESSING_INVALID_TARBALL = "The import tarball you provided was invalid."
  55. ERR_PREPROCESSING_NO_USERS = "The provided JSON must contain at least one user."
  56. ERR_PREPROCESSING_TOO_MANY_USERS = Template(
  57. f"The provided JSON must contain $count users but must not exceed the limit of {MAX_USERS_PER_RELOCATION}."
  58. )
  59. ERR_PREPROCESSING_NO_ORGS = "The provided JSON must contain at least one organization."
  60. ERR_PREPROCESSING_TOO_MANY_ORGS = Template(
  61. f"The provided JSON must contain $count organizations, but must not exceed the limit of {MAX_ORGS_PER_RELOCATION}."
  62. )
  63. ERR_PREPROCESSING_MISSING_ORGS = Template(
  64. "The following organization slug imports were requested, but could not be found in your submitted JSON: $orgs."
  65. )
  66. # TODO(getsentry/team-ospo#203): We should split this task in two, one for "small" imports of say
  67. # <=10MB, and one for large imports >10MB. Then we should limit the number of daily executions of
  68. # the latter.
  69. @instrumented_task(
  70. name="sentry.relocation.uploading_complete",
  71. queue="relocation",
  72. max_retries=MAX_FAST_TASK_RETRIES,
  73. retry_backoff=RETRY_BACKOFF,
  74. retry_backoff_jitter=True,
  75. soft_time_limit=UPLOADING_TIME_LIMIT,
  76. )
  77. def uploading_complete(uuid: str) -> None:
  78. """
  79. Just check to ensure that uploading the (potentially very large!) backup file has completed
  80. before we try to do all sorts of fun stuff with it.
  81. """
  82. relocation: Optional[Relocation]
  83. attempts_left: int
  84. (relocation, attempts_left) = start_relocation_task(
  85. uuid=uuid,
  86. step=Relocation.Step.UPLOADING,
  87. task=OrderedTask.UPLOADING_COMPLETE,
  88. allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
  89. )
  90. if relocation is None:
  91. return
  92. # Pull down the `RelocationFile` associated with this `Relocation`. Fallibility is expected
  93. # here: we're pushing a potentially very large file with many blobs to a cloud store, so it is
  94. # possible (likely, even) that not all of the blobs are yet available. If this segment fails,
  95. # we'll just allow the Exception to bubble up and retry the task if possible.
  96. with retry_task_or_fail_relocation(
  97. relocation,
  98. OrderedTask.UPLOADING_COMPLETE,
  99. attempts_left,
  100. ERR_UPLOADING_FAILED,
  101. ):
  102. raw_relocation_file = (
  103. RelocationFile.objects.filter(
  104. relocation=relocation,
  105. kind=RelocationFile.Kind.RAW_USER_DATA.value,
  106. )
  107. .select_related("file")
  108. .first()
  109. )
  110. fp = raw_relocation_file.file.getfile()
  111. with fp:
  112. preprocessing_scan.delay(uuid)
  113. @instrumented_task(
  114. name="sentry.relocation.preprocessing_scan",
  115. queue="relocation",
  116. max_retries=MAX_FAST_TASK_RETRIES,
  117. retry_backoff=RETRY_BACKOFF,
  118. retry_backoff_jitter=True,
  119. soft_time_limit=PREPROCESSING_TIME_LIMIT,
  120. silo_mode=SiloMode.REGION,
  121. )
  122. def preprocessing_scan(uuid: str) -> None:
  123. """
  124. Performs the very first part of the `PREPROCESSING` step of a `Relocation`, which involves
  125. decrypting the user-supplied tarball and picking out some useful information for it. This let's
  126. us validate a few things:
  127. - Ensuring that the user gave us properly encrypted data (was it encrypted? With the right
  128. key?).
  129. - Ensuring that the org slug the user supplied exists in the provided JSON data.
  130. - Recording the slugs of the orgs the relocation is attempting to import.
  131. - Recording the usernames of the users the relocation is attempting to import.
  132. Of the preprocessing tasks, this is the most resource-onerous (what if the importer provides a
  133. 2GB JSON blob? What if they have 20,000 usernames? Etc...) so we should take care with our retry
  134. logic and set careful limits.
  135. This function is meant to be idempotent, and should be retried with an exponential backoff.
  136. """
  137. relocation: Optional[Relocation]
  138. attempts_left: int
  139. (relocation, attempts_left) = start_relocation_task(
  140. uuid=uuid,
  141. step=Relocation.Step.PREPROCESSING,
  142. task=OrderedTask.PREPROCESSING_SCAN,
  143. allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
  144. )
  145. if relocation is None:
  146. return
  147. with retry_task_or_fail_relocation(
  148. relocation,
  149. OrderedTask.PREPROCESSING_SCAN,
  150. attempts_left,
  151. ERR_PREPROCESSING_INTERNAL,
  152. ):
  153. # The `uploading_complete` task above should have verified that this is ready for use.
  154. raw_relocation_file = (
  155. RelocationFile.objects.filter(
  156. relocation=relocation,
  157. kind=RelocationFile.Kind.RAW_USER_DATA.value,
  158. )
  159. .select_related("file")
  160. .first()
  161. )
  162. fp = raw_relocation_file.file.getfile()
  163. with fp:
  164. try:
  165. unwrapped = unwrap_encrypted_export_tarball(fp)
  166. except Exception:
  167. return fail_relocation(
  168. relocation,
  169. OrderedTask.PREPROCESSING_SCAN,
  170. ERR_PREPROCESSING_INVALID_TARBALL,
  171. )
  172. # Decrypt the DEK using Google KMS, and use the decrypted DEK to decrypt the encoded
  173. # JSON.
  174. try:
  175. plaintext_data_encryption_key = decrypt_data_encryption_key_using_gcp_kms(
  176. unwrapped,
  177. json.dumps(DEFAULT_CRYPTO_KEY_VERSION).encode("utf-8"),
  178. )
  179. decryptor = Fernet(plaintext_data_encryption_key)
  180. json_data = decryptor.decrypt(unwrapped.encrypted_json_blob).decode("utf-8")
  181. except Exception:
  182. return fail_relocation(
  183. relocation,
  184. OrderedTask.PREPROCESSING_SCAN,
  185. ERR_PREPROCESSING_DECRYPTION,
  186. )
  187. # Grab usernames and org slugs from the JSON data.
  188. usernames = []
  189. org_slugs = []
  190. try:
  191. for json_model in json.loads(json_data):
  192. model_name = NormalizedModelName(json_model["model"])
  193. if get_model(model_name) == Organization:
  194. org_slugs.append(json_model["fields"]["slug"])
  195. # TODO(getsentry/team-ospo#190): Validate slug using regex, so that we can
  196. # fail early on obviously invalid slugs. Also keeps the database `JSONField`
  197. # from ballooning on bad input.
  198. if get_model(model_name) == User:
  199. usernames.append(json_model["fields"]["username"])
  200. # TODO(getsentry/team-ospo#190): Validate username using regex, so that we
  201. # can fail early on obviously invalid usernames. Also keeps the database
  202. # `JSONField` from ballooning on bad input.
  203. except KeyError:
  204. return fail_relocation(
  205. relocation, OrderedTask.PREPROCESSING_SCAN, ERR_PREPROCESSING_INVALID_JSON
  206. )
  207. # Ensure that the data is reasonable and within our set bounds before we start on the
  208. # next task.
  209. if len(usernames) == 0:
  210. return fail_relocation(
  211. relocation,
  212. OrderedTask.PREPROCESSING_SCAN,
  213. ERR_PREPROCESSING_NO_USERS,
  214. )
  215. if len(usernames) > MAX_USERS_PER_RELOCATION:
  216. return fail_relocation(
  217. relocation,
  218. OrderedTask.PREPROCESSING_SCAN,
  219. ERR_PREPROCESSING_TOO_MANY_USERS.substitute(count=len(usernames)),
  220. )
  221. if len(org_slugs) == 0:
  222. return fail_relocation(
  223. relocation,
  224. OrderedTask.PREPROCESSING_SCAN,
  225. ERR_PREPROCESSING_NO_ORGS,
  226. )
  227. if len(org_slugs) > MAX_ORGS_PER_RELOCATION:
  228. return fail_relocation(
  229. relocation,
  230. OrderedTask.PREPROCESSING_SCAN,
  231. ERR_PREPROCESSING_TOO_MANY_ORGS.substitute(count=len(org_slugs)),
  232. )
  233. missing_org_slugs = set(relocation.want_org_slugs) - set(org_slugs)
  234. if len(missing_org_slugs):
  235. return fail_relocation(
  236. relocation,
  237. OrderedTask.PREPROCESSING_SCAN,
  238. ERR_PREPROCESSING_MISSING_ORGS.substitute(
  239. orgs=",".join(sorted(missing_org_slugs))
  240. ),
  241. )
  242. relocation.want_usernames = sorted(usernames)
  243. relocation.save()
  244. # TODO(getsentry/team-ospo#203): The user's import data looks basically okay - we should
  245. # use this opportunity to send a "your relocation request has been accepted and is in
  246. # flight, please give it a couple hours" email.
  247. preprocessing_baseline_config.delay(uuid)
  248. @instrumented_task(
  249. name="sentry.relocation.preprocessing_baseline_config",
  250. queue="relocation",
  251. max_retries=MAX_FAST_TASK_RETRIES,
  252. retry_backoff=RETRY_BACKOFF,
  253. retry_backoff_jitter=True,
  254. soft_time_limit=PREPROCESSING_TIME_LIMIT,
  255. silo_mode=SiloMode.REGION,
  256. )
  257. def preprocessing_baseline_config(uuid: str) -> None:
  258. """
  259. Pulls down the global config data we'll need to check for collisions and global data integrity.
  260. This function is meant to be idempotent, and should be retried with an exponential backoff.
  261. """
  262. relocation: Optional[Relocation]
  263. attempts_left: int
  264. (relocation, attempts_left) = start_relocation_task(
  265. uuid=uuid,
  266. step=Relocation.Step.PREPROCESSING,
  267. task=OrderedTask.PREPROCESSING_BASELINE_CONFIG,
  268. allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
  269. )
  270. if relocation is None:
  271. return
  272. with retry_task_or_fail_relocation(
  273. relocation,
  274. OrderedTask.PREPROCESSING_BASELINE_CONFIG,
  275. attempts_left,
  276. ERR_PREPROCESSING_INTERNAL,
  277. ):
  278. # TODO(getsentry/team-ospo#203): A very nice optimization here is to only pull this down
  279. # once a day - if we've already done a relocation today, we should just copy that file
  280. # instead of doing this (expensive!) global export again.
  281. fp = BytesIO()
  282. export_in_config_scope(
  283. fp,
  284. encrypt_with=BytesIO(get_public_key_using_gcp_kms(DEFAULT_CRYPTO_KEY_VERSION)),
  285. )
  286. fp.seek(0)
  287. kind = RelocationFile.Kind.BASELINE_CONFIG_VALIDATION_DATA
  288. file = File.objects.create(name=kind.to_filename("tar"), type=RELOCATION_FILE_TYPE)
  289. file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
  290. RelocationFile.objects.create(
  291. relocation=relocation,
  292. file=file,
  293. kind=kind.value,
  294. )
  295. preprocessing_colliding_users.delay(uuid)
  296. @instrumented_task(
  297. name="sentry.relocation.preprocessing_colliding_users",
  298. queue="relocation",
  299. max_retries=MAX_FAST_TASK_RETRIES,
  300. retry_backoff=RETRY_BACKOFF,
  301. retry_backoff_jitter=True,
  302. soft_time_limit=PREPROCESSING_TIME_LIMIT,
  303. silo_mode=SiloMode.REGION,
  304. )
  305. def preprocessing_colliding_users(uuid: str) -> None:
  306. """
  307. Pulls down any already existing users whose usernames match those found in the import - we'll
  308. need to validate that none of these are mutated during import.
  309. This function is meant to be idempotent, and should be retried with an exponential backoff.
  310. """
  311. relocation: Optional[Relocation]
  312. attempts_left: int
  313. (relocation, attempts_left) = start_relocation_task(
  314. uuid=uuid,
  315. step=Relocation.Step.PREPROCESSING,
  316. task=OrderedTask.PREPROCESSING_COLLIDING_USERS,
  317. allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
  318. )
  319. if relocation is None:
  320. return
  321. with retry_task_or_fail_relocation(
  322. relocation,
  323. OrderedTask.PREPROCESSING_COLLIDING_USERS,
  324. attempts_left,
  325. ERR_PREPROCESSING_INTERNAL,
  326. ):
  327. fp = BytesIO()
  328. export_in_user_scope(
  329. fp,
  330. encrypt_with=BytesIO(get_public_key_using_gcp_kms(DEFAULT_CRYPTO_KEY_VERSION)),
  331. user_filter=set(relocation.want_usernames),
  332. )
  333. fp.seek(0)
  334. kind = RelocationFile.Kind.COLLIDING_USERS_VALIDATION_DATA
  335. file = File.objects.create(name=kind.to_filename("tar"), type=RELOCATION_FILE_TYPE)
  336. file.putfile(fp, blob_size=RELOCATION_BLOB_SIZE, logger=logger)
  337. RelocationFile.objects.create(
  338. relocation=relocation,
  339. file=file,
  340. kind=kind.value,
  341. )
  342. preprocessing_complete.delay(uuid)
  343. @instrumented_task(
  344. name="sentry.relocation.preprocessing_complete",
  345. queue="relocation",
  346. max_retries=MAX_FAST_TASK_RETRIES,
  347. retry_backoff=RETRY_BACKOFF,
  348. retry_backoff_jitter=True,
  349. soft_time_limit=PREPROCESSING_TIME_LIMIT,
  350. silo_mode=SiloMode.REGION,
  351. )
  352. def preprocessing_complete(uuid: str) -> None:
  353. """
  354. Creates a "composite object" from the uploaded tarball, which could have many pieces. Because
  355. creating a composite object in this manner is a synchronous operation, we don't need a follow-up
  356. step confirming success.
  357. This function is meant to be idempotent, and should be retried with an exponential backoff.
  358. """
  359. relocation: Optional[Relocation]
  360. attempts_left: int
  361. (relocation, attempts_left) = start_relocation_task(
  362. uuid=uuid,
  363. step=Relocation.Step.PREPROCESSING,
  364. task=OrderedTask.PREPROCESSING_COMPLETE,
  365. allowed_task_attempts=MAX_FAST_TASK_ATTEMPTS,
  366. )
  367. if relocation is None:
  368. return
  369. with retry_task_or_fail_relocation(
  370. relocation,
  371. OrderedTask.PREPROCESSING_COMPLETE,
  372. attempts_left,
  373. ERR_PREPROCESSING_INTERNAL,
  374. ):
  375. storage = get_storage()
  376. for kind in RELOCATION_FILES_TO_BE_VALIDATED:
  377. raw_relocation_file = (
  378. RelocationFile.objects.filter(
  379. relocation=relocation,
  380. kind=kind.value,
  381. )
  382. .select_related("file")
  383. .prefetch_related("file__blobs")
  384. .first()
  385. )
  386. file = raw_relocation_file.file
  387. path = f'relocations/runs/{uuid}/in/{kind.to_filename("tar")}'
  388. if isinstance(storage, GoogleCloudStorage):
  389. # If we're using GCS, rather than performing an expensive copy of the file, just
  390. # create a composite object.
  391. storage.client.bucket(storage.bucket_name).blob(path).compose(
  392. [b.getfile().blob for b in file.blobs.all()]
  393. )
  394. else:
  395. # In S3 or the local filesystem, no "composite object" API exists, so we do a manual
  396. # concatenation then copying instead.
  397. fp = file.getfile()
  398. fp.seek(0)
  399. storage.save(path, fp)
  400. relocation.step = Relocation.Step.VALIDATING.value
  401. relocation.save()
  402. validating_start.delay(uuid)
  403. @instrumented_task(
  404. name="sentry.relocation.validating_start",
  405. queue="relocation",
  406. max_retries=MAX_FAST_TASK_RETRIES,
  407. retry_backoff=RETRY_BACKOFF,
  408. retry_backoff_jitter=True,
  409. soft_time_limit=PREPROCESSING_TIME_LIMIT,
  410. silo_mode=SiloMode.REGION,
  411. )
  412. def validating_start(uuid: str) -> None:
  413. """
  414. Calls into Google CloudBuild and kicks off a validation run.
  415. This function is meant to be idempotent, and should be retried with an exponential backoff.
  416. """
  417. # TODO(getsentry/team-ospo#203): Implement this.
  418. pass