编程是一门艺术

raptor.zh(at)gmail.com Creative Commons License
本作品采用知识共享署名-非商业性使用-相同方式共享 2.5 中国大陆许可协议进行许可。

archives 存档

01 Jan - 31 Dec 2017
01 Jan - 31 Dec 2016
01 Jan - 31 Dec 2015
01 Jan - 31 Dec 2014
01 Jan - 31 Dec 2013
01 Jan - 31 Dec 2012
01 Jan - 31 Dec 2011
01 Jan - 31 Dec 2010
01 Jan - 31 Dec 2009
01 Jan - 31 Dec 2008
01 Jan - 31 Dec 2007
01 Jan - 31 Dec 2006
01 Jan - 31 Dec 2005
01 Jan - 31 Dec 2004
01 Jan - 31 Dec 2003
01 Jan - 31 Dec 2002
01 Jan - 31 Dec 2001
01 Jan - 31 Dec 2000
01 Jan - 31 Dec 1999

--

links 链接

--

一个Redis Cache实现

需求

应用中需要通过HTTP调用远程的数据,但是这个获取过程需要执行较长时间,而且这个数据本身的变化也不频繁,这种情况最适合用一个cache来优化。

前两年在做短链接实现的时候,曾经用最好的语言PHP做过一个Redis cache实现《一个简单的Redis应用(修订版)》,但那个毕竟是一个特定的实现,而且我现在需要的是python版。

这次的目标是需要实现一个比较通用的cache,支持各种数据类型,有超时更新机制,超时更新需要有锁(防止前文那个例子里发生过的问题)。

代码(py3)

#!/usr/bin/env python
# -*- coding: utf-8 -*-


import hashlib
import pickle
from functools import wraps

from redis import Redis

import logging


__author__ = 'raptor'

logger = logging.getLogger(__name__)


class RedisCache(object):
    MAX_EXPIRES = 86400
    SERIALIZER = pickle
    LOCKER = set()

    def __init__(self, name, host='localhost', port=6379, db=0, max_expires=MAX_EXPIRES):
        self.db = Redis(host=host, port=port, db=db)
        self.name = name
        self.max_expires = max_expires

    def _getkey(self, *keys):
        return ":".join([self.name] + list(keys))

    def _get_data(self, key):
        result = self.db.get(key)
        return None if result == b'None' else result

    def get(self, key):
        result = self._get_data(self._getkey(key))
        return self.SERIALIZER.loads(result) if result is not None else result

    def set(self, key, value, ex=None):
        k = self._getkey(key)
        v = self.SERIALIZER.dumps(value)
        if ex is None:
            self.db.set(k, v)
        else:
            self.db.setex(k, v, ex)

    def delete(self, key):
        self.db.delete(self._getkey(key))

    @staticmethod
    def build_key(name, *args, **kwargs):
        m = hashlib.md5()
        m.update(name.encode('utf-8'))
        m.update(pickle.dumps(args))
        m.update(pickle.dumps(kwargs))
        return m.hexdigest()

    def cached(self, key, func, ex=None):
        if ex is None:
            ex = self.max_expires
        min_ttl = self.max_expires - ex  # ex <= 0 : force refresh data
        key = ":".join([self.name, key])
        result = self._get_data(key)
        if key not in self.LOCKER:
            self.LOCKER.add(key)
            try:
                ttl = self.db.ttl(key)
                if ttl is None or ttl < min_ttl:
                    result = func()
                    if result is not None:
                        result = self.SERIALIZER.dumps(result)
                    self.db.setex(key, result, self.max_expires)
            finally:
                self.LOCKER.remove(key)
        try:
            result = self.SERIALIZER.loads(result) if result is not None else None
        except:
            pass
        return result


def redis_cached(db, ex=None):
    def decorator(fn):
        @wraps(fn)
        def wrapper(*args, **kwargs):
            key = RedisCache.build_key(fn.__name__, *args, **kwargs)
            return db.cached(key, lambda: fn(*args, **kwargs), ex)
        return wrapper
    return decorator

用法

RedisCache本身也可以当一个Redis数据库对象使用,比如:

db = RedisCache('tablename', max_expires=3600)  #  tablename是一个是自定义的key前缀,可以用于当作表名使用。
# 最大超时时间(max_expires)仅供cached使用,使用set时,如果不指定超时时间则永不超时
db.set('aaa', {'key': 1234}, 7200)  # value可以是作何可序列化数据类型,比如字典,不指定超时则永不超时
db.get('aaa')['key']  # 结果为1234
db.delete('aaa')

但这个不是重点,重点是cached功能。对于慢速函数,加上db.cached以后,可以对函数调用的结果进行cache,在cache有效的情况下,大幅提高函数在反复调用时的性能。

下面是一个例子,具体见代码中的注释:

db = RedisCache('tablename')

def func(url, **kwargs):
    result = requests.get("?".join([url, urlencode(kwargs)]))
    return result

url = 'https://www.baidu.com/s'
t = time()
func(url, wd="测试")
print(time()-t)  # 较慢
t = time()
db.cached('test_cache', lambda: func(url, wd="测试"), 10)
print(time()-t)  # 第一次运行仍然较慢
t = time()
db.cached('test_cache', lambda: func(url, wd="测试"), 10)
print(time()-t)  # 从redis里读取cache很快
sleep(11)  # 等待到超时
t = time()
db.cached('test_cache', lambda: func(url, wd="测试"), 10)
print(time()-t)  # 超时后会再次执行func更新cache
t = time()
db.cached('test_cache_new', lambda: func(url, wd="新的测试"))
print(time()-t)  # 不同的调用参数用不同的key作cache
t = time()

因为对于不同的函数调用参数,函数可能有不同的返回结果,所以应该用不同的key进行cache。为简单起见,可以把函数签名做一个HASH,然后以此为KEY进行cache。最后把这个操作做成一个decorator,这样,只需要给函数加上这个decorator即可自动提供所需要的cache支持。

最终的简单用法如下:

db = RedisCache('tablename')

@redis_cached(db, 10)
def func(url, **kwargs):
    result = requests.get("?".join([url, urlencode(kwargs)]))
    return result

t = time()
func(url, wd="测试")
print(time()-t)
t = time()
func(url, wd="测试")
print(time()-t)
sleep(11)
t = time()
func(url, wd="测试")
print(time()-t)
t = time()
func(url, wd="新的测试")
print(time()-t)

是不是简单得多了。

推送到[go4pro.org]

Trackback link:

Please enable javascript to generate a trackback url

No trackbacks

评论(0)


 
   
 
  表情图标 

 


提示: 除了 <b> 和 <i> 之外,其他的Html标签都将从您的评论中去除.url或mail地址会被自动加上链接.