-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrandom_user_fetcher.py
More file actions
73 lines (57 loc) · 2.09 KB
/
random_user_fetcher.py
File metadata and controls
73 lines (57 loc) · 2.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import os
import random
import time
from databunkerpro import DatabunkerproAPI
# Get credentials from environment
api_url = os.getenv("DATABUNKER_API_URL", "http://localhost")
api_token = os.getenv("DATABUNKER_API_TOKEN", "dcc33285-4bfd-6e3b-eeb8-05e879afa943")
tenant_name = os.getenv("DATABUNKER_TENANT_NAME", "")
def read_tokens_from_file(filename="user_tokens.txt"):
"""Read tokens from file and return as a list."""
try:
with open(filename, "r") as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
print(f"Error: {filename} not found")
return []
def fetch_random_user(api, tokens):
"""Fetch a random user record from the list of tokens."""
if not tokens:
print("No tokens available")
return None
token = random.choice(tokens)
try:
result = api.get_user("token", token)
if result and result.get("status") == "ok":
return result
else:
print(
f"Error fetching token {token}: {result.get('message', 'Unknown error')}"
)
return None
except Exception as e:
print(f"Exception fetching token {token}: {str(e)}")
return None
def main():
if not all([api_token]):
print("Error: DATABUNKER_API_TOKEN environment variable must be set")
return
# Initialize API client
api = DatabunkerproAPI(api_url, api_token, tenant_name)
# Read tokens from file
tokens = read_tokens_from_file()
if not tokens:
return
print(f"Found {len(tokens)} tokens")
# Fetch random users
num_fetches = 5 # Number of random fetches to perform
print(f"\nFetching {num_fetches} random users:")
for i in range(num_fetches):
print(f"\nFetch {i + 1}/{num_fetches}:")
user = fetch_random_user(api, tokens)
if user:
print(f"Successfully fetched user with token: {user.get('token', 'N/A')}")
print(f"User data: {user.get('profile', {})}")
time.sleep(1) # Small delay between requests
if __name__ == "__main__":
main()