123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- #!/usr/bin/env python
- from __future__ import annotations
- from gzip import GzipFile
- from http.client import HTTPResponse, HTTPSConnection
- from io import BytesIO
- from time import time
- from typing import Any
- from urllib.parse import urlencode
- import click
- from sentry.utils import json
- help = """
- A file with entries separated by newlines, with each entry taking the form:
- <USER_BEING_MERGED_ID> -> <USER_MERGING_INTO_ID>. For example, the following file will result in
- user 1 being merged into user 2, and 3 into 4:
- 1 -> 2
- 3 -> 4
- """
- @click.command(
- help="Merge many users at a time using sequential POST calls. The output will be a multiline list indicating success or failure."
- )
- @click.argument(
- "merging",
- type=click.File("r"),
- # help=help,
- )
- @click.option(
- "--api",
- required=True,
- help="The Sentry API server to hit. Ex: `sentry.io`.",
- )
- @click.option(
- "--csrf",
- required=True,
- help="The X-CSRF token.",
- )
- @click.option(
- "--cookie",
- required=True,
- help="The superadmin cookie, copied verbatim.",
- )
- def merge_users(merging, api: str, csrf: str, cookie: str):
- # Setup the connection.
- conn = HTTPSConnection(api)
- heartbeat = int(time())
- # Validate file format.
- calls = []
- with merging as fp:
- for line in fp:
- [from_user, into_user] = line.split("->")
- from_user_id = int(from_user.strip())
- into_user_id = int(into_user.strip())
- calls.append((from_user_id, into_user_id))
- results = []
- success = 0
- # Make actual requests, one at a time.
- for args in calls:
- t = int(time())
- if t - heartbeat > 5:
- heartbeat = t
- print(
- f"attempted: {len(results)}, succeeded: {success}, failed: {len(results) - success}"
- )
- (merging_from_user_id, merging_into_user_id) = args
- result = do_merge(conn, merging_from_user_id, merging_into_user_id, api, csrf, cookie)
- results.append(f"{merging_from_user_id} -> {merging_into_user_id}: {result}")
- if result // 100 == 2:
- success += 1
- print(f"attempted: {len(results)}, succeeded: {success}, failed: {len(results) - success}")
- print("\n\n")
- print("\n".join(results))
- def make_headers(csrf: str, cookie: str) -> dict[str, str]:
- return {
- "Accept": "application/json; charset=utf-8",
- "Accept-Encoding": "gzip, deflate, br, zstd",
- "Accept-Language": "en-US,en;q=0.9",
- "Cache-Control": "no-cache",
- "Content-Type": "application/json",
- "Cookie": cookie,
- "Pragma": "no-cache",
- "Priority": "u=1, i",
- "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
- "X-Csrftoken": csrf,
- }
- def do_merge(
- conn: HTTPSConnection,
- merging_from_user_id: int,
- merging_into_user_id: int,
- api: str,
- csrf: str,
- cookie: str,
- ) -> int:
- """
- Perform a single merge.
- """
- # Set inputs
- endpoint = f"/api/0/users/{merging_into_user_id}/merge-accounts/"
- full_url = f"http://{api}{endpoint}"
- print(f"merging user {merging_from_user_id} into {merging_into_user_id} at {full_url}")
- # Make the underlying request, and load it into JSON.
- conn.request(
- "POST",
- full_url,
- headers=make_headers(csrf, cookie),
- body=json.dumps({"users": [merging_from_user_id]}),
- )
- response = conn.getresponse()
- response.close()
- return response.code
- if __name__ == "__main__":
- merge_users()
|