-
Notifications
You must be signed in to change notification settings - Fork 2.7k
.3493622109966576:a197018d5c60a54a51b49ca15d6fb0b3_69e7125e81ef3ff6db540126.69e7462981ef3ff6db540408.69e74628d6025e57c275fa6c:Trae CN.T(2026/4/21 17:40:57) #4042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,7 @@ | |
| WatchError, | ||
| ) | ||
| from redis.lock import Lock | ||
| from redis.local_cache import LocalCache | ||
| from redis.maint_notifications import ( | ||
| MaintNotificationsConfig, | ||
| OSSMaintNotificationsHandler, | ||
|
|
@@ -290,6 +291,8 @@ def __init__( | |
| oss_cluster_maint_notifications_handler: Optional[ | ||
| OSSMaintNotificationsHandler | ||
| ] = None, | ||
| local_cache_ttl: int = 0, | ||
| local_cache_max_size: int = 0, | ||
| ) -> None: | ||
| """ | ||
| Initialize a new Redis client. | ||
|
|
@@ -342,6 +345,16 @@ def __init__( | |
| `redis.maint_notifications.OSSMaintNotificationsHandler` for details. | ||
| Only supported with RESP3 | ||
| Argument is ignored when connection_pool is provided. | ||
| local_cache_ttl: | ||
| **Experimental.** Local memory cache TTL in milliseconds. | ||
| If both `local_cache_ttl` and `local_cache_max_size` are greater than 0, | ||
| the local memory cache layer will be activated for GET commands. | ||
| This is an experimental feature for reducing hot key read latency. | ||
| local_cache_max_size: | ||
| **Experimental.** Maximum number of keys to store in local memory cache. | ||
| If both `local_cache_ttl` and `local_cache_max_size` are greater than 0, | ||
| the local memory cache layer will be activated. | ||
| LRU eviction is used when cache exceeds this size. | ||
| """ | ||
| if event_dispatcher is None: | ||
| self._event_dispatcher = EventDispatcher() | ||
|
|
@@ -491,6 +504,10 @@ def __init__( | |
| else: | ||
| self.response_callbacks.update(_RedisCallbacksRESP2) | ||
|
|
||
| self._local_cache: Optional[LocalCache] = None | ||
| if local_cache_ttl > 0 and local_cache_max_size > 0: | ||
| self._local_cache = LocalCache(local_cache_ttl, local_cache_max_size) | ||
|
|
||
| def __repr__(self) -> str: | ||
| return ( | ||
| f"<{type(self).__module__}.{type(self).__name__}" | ||
|
|
@@ -774,6 +791,31 @@ def _execute_command(self, *args, **options): | |
| """Execute a command and return a parsed response""" | ||
| pool = self.connection_pool | ||
| command_name = args[0] | ||
| conn = None | ||
|
|
||
| if self._local_cache is not None: | ||
| if isinstance(command_name, bytes): | ||
| try: | ||
| command_upper = command_name.decode('utf-8').upper() | ||
| except UnicodeDecodeError: | ||
| command_upper = command_name.decode('latin-1').upper() | ||
| else: | ||
| command_upper = command_name.upper() | ||
|
|
||
| if command_upper == "GET" and len(args) >= 2: | ||
| key = args[1] | ||
| cached_value = self._local_cache.get(key) | ||
| if cached_value is not None: | ||
| return cached_value | ||
|
|
||
| elif command_upper in ("SET", "SETEX", "DEL") and len(args) >= 2: | ||
| if command_upper == "DEL": | ||
| for key in args[1:]: | ||
| self._local_cache.delete(key) | ||
| else: | ||
| key = args[1] | ||
| self._local_cache.delete(key) | ||
|
|
||
| conn = self.connection or pool.get_connection() | ||
|
|
||
| # Start timing for observability | ||
|
|
@@ -805,6 +847,19 @@ def failure_callback(error, failure_count): | |
| server_port=getattr(conn, "port", None), | ||
| db_namespace=str(conn.db), | ||
| ) | ||
|
|
||
| if self._local_cache is not None: | ||
| if isinstance(command_name, bytes): | ||
| try: | ||
| command_upper = command_name.decode('utf-8').upper() | ||
| except UnicodeDecodeError: | ||
| command_upper = command_name.decode('latin-1').upper() | ||
| else: | ||
| command_upper = command_name.upper() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Redundant
|
||
| if command_upper == "GET" and len(args) >= 2: | ||
| key = args[1] | ||
| self._local_cache.set(key, result) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cached
|
||
|
|
||
| return result | ||
| except Exception as e: | ||
| record_error_count( | ||
|
|
@@ -824,7 +879,7 @@ def failure_callback(error, failure_count): | |
| conn.connect() | ||
| if self._single_connection_client: | ||
| self.single_connection_lock.release() | ||
| if not self.connection: | ||
| if not self.connection and conn: | ||
| pool.release(conn) | ||
|
|
||
| def parse_response(self, connection, command_name, **options): | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| import threading | ||
| import time | ||
| from collections import OrderedDict | ||
| from typing import Any, Optional | ||
|
|
||
|
|
||
| class LocalCache: | ||
| """ | ||
| 线程安全的本地内存缓存,支持 TTL 和 LRU 淘汰策略。 | ||
|
|
||
| 此缓存用于降低对远端 Redis 的高频读取延迟,特别适合 Hot Key 场景。 | ||
|
|
||
| Attributes: | ||
| ttl_ms: 缓存条目存活时间(毫秒) | ||
| max_size: 缓存最大条目数量 | ||
| _cache: 内部存储,使用 OrderedDict 实现 LRU | ||
| _lock: 线程锁 | ||
| """ | ||
|
|
||
| def __init__(self, ttl_ms: int, max_size: int): | ||
| """ | ||
| 初始化本地缓存。 | ||
|
|
||
| Args: | ||
| ttl_ms: 缓存存活时间(毫秒),必须大于 0 | ||
| max_size: 最大缓存条目数量,必须大于 0 | ||
|
|
||
| Raises: | ||
| ValueError: 如果 ttl_ms 或 max_size 小于等于 0 | ||
| """ | ||
| if ttl_ms <= 0: | ||
| raise ValueError("ttl_ms must be greater than 0") | ||
| if max_size <= 0: | ||
| raise ValueError("max_size must be greater than 0") | ||
|
|
||
| self.ttl_ms = ttl_ms | ||
| self.max_size = max_size | ||
| self._cache: OrderedDict = OrderedDict() | ||
| self._lock = threading.RLock() | ||
|
|
||
| def get(self, key: Any) -> Optional[Any]: | ||
| """ | ||
| 从缓存中获取值。 | ||
|
|
||
| 如果缓存命中且未过期,返回对应值并更新 LRU 顺序。 | ||
| 如果缓存未命中或已过期,返回 None。 | ||
|
|
||
| Args: | ||
| key: 缓存键 | ||
|
|
||
| Returns: | ||
| 缓存值或 None | ||
| """ | ||
| with self._lock: | ||
| if key not in self._cache: | ||
| return None | ||
|
|
||
| entry = self._cache[key] | ||
| current_time = time.time() * 1000 | ||
|
|
||
| if current_time - entry["timestamp"] > self.ttl_ms: | ||
| del self._cache[key] | ||
| return None | ||
|
|
||
| self._cache.move_to_end(key) | ||
| return entry["value"] | ||
|
|
||
| def set(self, key: Any, value: Any) -> None: | ||
| """ | ||
| 设置缓存值。 | ||
|
|
||
| 如果缓存已达到最大容量,会淘汰最久未使用的条目(LRU)。 | ||
|
|
||
| Args: | ||
| key: 缓存键 | ||
| value: 缓存值 | ||
| """ | ||
| with self._lock: | ||
| if key in self._cache: | ||
| del self._cache[key] | ||
| elif len(self._cache) >= self.max_size: | ||
| self._cache.popitem(last=False) | ||
|
|
||
| self._cache[key] = { | ||
| "value": value, | ||
| "timestamp": time.time() * 1000 | ||
| } | ||
|
|
||
| def delete(self, key: Any) -> bool: | ||
| """ | ||
| 删除缓存条目。 | ||
|
|
||
| Args: | ||
| key: 缓存键 | ||
|
|
||
| Returns: | ||
| 如果键存在并被删除返回 True,否则返回 False | ||
| """ | ||
| with self._lock: | ||
| if key in self._cache: | ||
| del self._cache[key] | ||
| return True | ||
| return False | ||
|
|
||
| def clear(self) -> int: | ||
| """ | ||
| 清空所有缓存条目。 | ||
|
|
||
| Returns: | ||
| 被清除的条目数量 | ||
| """ | ||
| with self._lock: | ||
| count = len(self._cache) | ||
| self._cache.clear() | ||
| return count | ||
|
|
||
| def __len__(self) -> int: | ||
| """ | ||
| 返回缓存中的条目数量。 | ||
|
|
||
| 注意:此方法不会清理过期条目,仅返回当前存储的条目数。 | ||
|
|
||
| Returns: | ||
| 缓存中的条目数量 | ||
| """ | ||
| with self._lock: | ||
| return len(self._cache) | ||
|
|
||
| def __contains__(self, key: Any) -> bool: | ||
| """ | ||
| 检查键是否存在且未过期。 | ||
|
|
||
| Args: | ||
| key: 缓存键 | ||
|
|
||
| Returns: | ||
| 如果键存在且未过期返回 True,否则返回 False | ||
| """ | ||
| return self.get(key) is not None |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Many write commands don't invalidate the local cache
High Severity
Cache invalidation only covers
SET,SETEX, andDEL, but many other write commands (INCR,DECRBY,APPEND,GETSET,GETDEL,MSET,SETRANGE,INCRBYFLOAT, etc.) all flow throughexecute_commandand modify key values without invalidating the local cache. A sequence liker.get("counter")followed byr.incr("counter")thenr.get("counter")will silently return the stale cached value, producing incorrect results with no indication of error.Reviewed by Cursor Bugbot for commit 8d310b6. Configure here.