from threading import RLock from urllib2 import URLError,urlopen import gtk class AvCache: """ Store a cache of user data we've already downloaded. This cache will be accessed by a number of threads, so it includes a lock as well. """ class __impl: """ Implementation of the singleton interface """ def __init__(self): self.lock = RLock() self.map = {} # storage for the instance reference __instance = None def __init__(self): """ Create singleton instance """ # Check whether we already have an instance if AvCache.__instance is None: # Create and remember instance AvCache.__instance = AvCache.__impl() # Store instance reference as the only member in the handle self.__dict__['_AvCache__instance'] = AvCache.__instance def __getattr__(self, attr): """ Delegate access to implementation """ return getattr(self.__instance, attr) def __setattr__(self, attr, value): """ Delegate access to implementation """ return setattr(self.__instance, attr, value) # end class AvCache class NameCache: """ Store a cache of user names we've already downloaded. This cache will be accessed by a number of threads, so it includes a lock as well. """ class __impl: """ Implementation of the singleton interface """ def __init__(self): self.lock = RLock() self.map = {} # storage for the instance reference __instance = None def __init__(self): """ Create singleton instance """ # Check whether we already have an instance if NameCache.__instance is None: # Create and remember instance NameCache.__instance = NameCache.__impl() # Store instance reference as the only member in the handle self.__dict__['_NameCache__instance'] = NameCache.__instance def __getattr__(self, attr): """ Delegate access to implementation """ return getattr(self.__instance, attr) def __setattr__(self, attr, value): """ Delegate access to implementation """ return setattr(self.__instance, attr, value) # end class NameCache def add_to_av_cache(user): """ This helping function takes a python-twitter User object, grabs the image and from the image_url and adds it to the cache, along with the user's full name, with the screen_name as the key. """ with AvCache().lock: hit = AvCache().map.has_key(user.screen_name) if not hit: try: url = urlopen(user.profile.image_url) data = url.read() loader = gtk.gdk.PixbufLoader() loader.write(str(data)) image = loader.get_pixbuf() loader.close() with AvCache().lock: AvCache().map[user.screen_name] = image except (URLError, ValueError): pass def add_to_name_cache(user, api=None): """ This helping function takes a python-twitter User object and saves the user's full name to the cache. If an API is passed in and the name in 'user' is false, we can grab the UserInfo for the user and get the name directly from that. """ with NameCache().lock: hit = NameCache().map.has_key(user.screen_name) if not hit: NameCache().map[user.screen_name] = user.name name = NameCache().map[user.screen_name] # Now let's look at the name we've got, and see if we need a better one if not name: with api.lock: try: new_user = api.GetUser(user.screen_name) except (HTTPError, URLError): new_user = None if new_user is not None: with NameCache().lock: NameCache().map[user.screen_name] = new_user.name