-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathextensions.py
More file actions
176 lines (153 loc) · 5.41 KB
/
extensions.py
File metadata and controls
176 lines (153 loc) · 5.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# extensions.py
from flask_sqlalchemy import SQLAlchemy
from redis import Redis
from datetime import datetime, timedelta
import json
# Initialize SQLAlchemy
db = SQLAlchemy()
# Initialize Redis with connection pool
redis_client = Redis(
host='localhost', # or your Redis host
port=6379, # default Redis port
db=0, # database number
decode_responses=True, # automatically decode responses to Python strings
socket_timeout=5 # timeout in seconds
)
class RedisTokenManager:
"""Helper class for managing tokens in Redis"""
@staticmethod
def store_tokens(client_id, access_token, feed_token, expiry=86400):
"""Store user tokens in Redis with expiration"""
try:
user_key = f"user:{client_id}"
token_data = {
"access_token": access_token,
"feed_token": feed_token,
"created_at": datetime.now().timestamp()
}
# Store tokens
redis_client.hmset(user_key, token_data)
redis_client.expire(user_key, expiry)
return True
except Exception as e:
print(f"Error storing tokens in Redis: {str(e)}")
return False
@staticmethod
def get_tokens(client_id):
"""Get user tokens from Redis"""
try:
user_key = f"user:{client_id}"
token_data = redis_client.hgetall(user_key)
if not token_data:
return None
return {
"access_token": token_data.get("access_token"),
"feed_token": token_data.get("feed_token")
}
except Exception as e:
print(f"Error getting tokens from Redis: {str(e)}")
return None
@staticmethod
def clear_tokens(client_id):
"""Clear user tokens from Redis"""
try:
user_key = f"user:{client_id}"
redis_client.delete(user_key)
return True
except Exception as e:
print(f"Error clearing tokens from Redis: {str(e)}")
return False
class RedisWatchlistManager:
"""Helper class for managing watchlist data in Redis"""
@staticmethod
def store_watchlist(user_id, watchlist_data, expiry=86400):
"""Store watchlist data in Redis"""
try:
key = f"user:{user_id}:watchlists"
redis_client.set(key, json.dumps(watchlist_data))
redis_client.expire(key, expiry)
return True
except Exception as e:
print(f"Error storing watchlist in Redis: {str(e)}")
return False
@staticmethod
def get_watchlist(user_id):
"""Get watchlist data from Redis"""
try:
key = f"user:{user_id}:watchlists"
data = redis_client.get(key)
return json.loads(data) if data else None
except Exception as e:
print(f"Error getting watchlist from Redis: {str(e)}")
return None
def init_extensions(app):
"""Initialize all extensions"""
db.init_app(app)
# Test Redis connection
try:
redis_client.ping()
print("Redis connection successful")
except Exception as e:
print(f"Redis connection failed: {str(e)}")
# You might want to handle this based on your needs
# For example, fallback to a different cache or raise an error
pass
def cleanup_expired_sessions():
"""Cleanup expired sessions from Redis"""
try:
pattern = "user:*"
for key in redis_client.scan_iter(match=pattern):
if redis_client.ttl(key) <= 0:
redis_client.delete(key)
except Exception as e:
print(f"Error cleaning up sessions: {str(e)}")
def get_market_status():
"""Get market status from Redis"""
try:
status = redis_client.get('market_status')
return json.loads(status) if status else None
except Exception as e:
print(f"Error getting market status: {str(e)}")
return None
# Error handler for Redis operations
class RedisError(Exception):
"""Custom exception for Redis operations"""
pass
# Redis health check
def check_redis_health():
"""Check Redis connection and return status"""
try:
redis_client.ping()
return {
'status': 'healthy',
'timestamp': datetime.now().isoformat()
}
except Exception as e:
return {
'status': 'unhealthy',
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# Add new Redis methods for order management
class RedisOrderManager:
"""Helper class for managing order data in Redis"""
@staticmethod
def store_order_data(order_id, order_data, expiry=86400):
"""Store order data in Redis"""
try:
key = f"order:{order_id}"
redis_client.setex(key, expiry, json.dumps(order_data))
return True
except Exception as e:
print(f"Error storing order data in Redis: {str(e)}")
return False
@staticmethod
def get_order_data(order_id):
"""Get order data from Redis"""
try:
key = f"order:{order_id}"
data = redis_client.get(key)
return json.loads(data) if data else None
except Exception as e:
print(f"Error getting order data from Redis: {str(e)}")
return None