# Python module that handles calls to the twitter API as a separate thread import re import gtk, gobject from threading import Thread,RLock from twitter import Api from urllib2 import HTTPError,URLError,urlopen from avcache import AvCache class CustomApi(Api): ''' This is a Twitter API with an RLock for multi-threaded access Also included is logic for processing the list names when the object is instantiated ''' def __init__(self, username, password): Api.__init__(self, username, password) self.lock = RLock() self.sig_proxy = SigProxy() self.username = username thread = GetUserLists(api=self) thread.sig_proxy.connect('lists-ready', self.on_lists_ready) thread.start() def on_lists_ready(self, widget, lists, ignored): list_names = [] for l in lists['lists']: list_names.append(l.name) list_names.sort() self.sig_proxy.emit('lists-ready', self.username, list_names) # End class CustomApi class ApiThread(Thread): def __init__(self, api): Thread.__init__(self) self.api = api self.setDaemon(True) # End class ApiThread class GetTweets(ApiThread): def __init__(self, api, list_name, pane, num_entries, username): ApiThread.__init__(self, api) self.list_name = list_name self.pane = pane self.num_entries = num_entries self.username = username def run(self): with self.api.lock: try: # username/Home entries need to load the appropriate Home feed if self.list_name == self.username + '/Home': statuses = self.api.GetHomeTimeline(count=self.num_entries) # For @username, check if it is one of our usernames, or # just needs to be searched on elif self.list_name == '@' + self.username: statuses = self.api.GetMentions(count=self.num_entries) elif re.match('@', self.list_name): statuses = results_to_statuses(self.api.Search(self.list_name, rpp=self.num_entries)) # Direct Messages should match like /Home, above elif self.list_name == self.username + '/Direct Messages': statuses = dms_to_statuses(self.api.GetDirectMessages()) # User lookups go straight to the user elif re.match(r'user: ', self.list_name): statuses = self.api.GetUserTimeline(re.sub(r'^user: ', r'', self.list_name), count=self.num_entries) # Lists load the appropriate list from the appropriate account elif re.match(r'list: ', self.list_name): real_list = re.sub(r'list: .*/(.*)', r'\1', self.list_name) statuses = self.api.GetListStatuses(real_list, per_page=self.num_entries) # Everything else is a straight search else: statuses = results_to_statuses(self.api.Search(self.list_name, rpp=self.num_entries)) except (HTTPError, URLError): statuses = None # For each user id present, populate the AvCache with an image if statuses: for status in statuses: add_to_cache(status) gtk.gdk.threads_enter() try: self.pane.update_window(statuses) finally: gtk.gdk.threads_leave() ### End class GetTweets class GetSingleTweet(ApiThread): def __init__(self, api, pane, single_tweet): ApiThread.__init__(self, api) self.pane = pane self.single_tweet = single_tweet def run(self): statuses = [] with self.api.lock: try: statuses.append(self.api.GetStatus(self.single_tweet)) except (HTTPError, URLError): statuses = None # In case we've never seen this user, grab their profile image for status in statuses: add_to_cache(status) gtk.gdk.threads_enter() try: self.pane.update_window(statuses) finally: gtk.gdk.threads_leave() ### End class GetSingleTweet class GetFollowing(ApiThread): def __init__(self, api, pane, user): ApiThread.__init__(self, api) self.pane = pane self.user = user def run(self): screen_name = re.sub('user: ', '', self.user) try: with self.api.lock: relationship = self.api.ShowFriendships(target_screen_name=screen_name) following = relationship.source.following except (HTTPError, URLError): following = false self.pane.set_following(following) ### End class GetFollowing class GetVerified(ApiThread): def __init__(self, api, pane, user): ApiThread.__init__(self, api) self.pane = pane self.user = user def run(self): screen_name = re.sub('user: ', '', self.user) try: with self.api.lock: user = self.api.GetUser(screen_name) verified = user.verified except (HTTPError, URLError): verified = false self.pane.set_verified(verified) ### End class GetVerified class GetUserLists(ApiThread): def __init__(self, api): ApiThread.__init__(self, api) self.sig_proxy = SigProxy() def run(self): lists = [] done = False while not done: done = True try: with self.api.lock: lists = self.api.GetUserLists() except (HTTPError, URLError): done = False self.sig_proxy.emit('lists-ready', lists, None) ### End class GetUserLists class SigProxy(gtk.Alignment): def __init__(self): gtk.Alignment.__init__(self) # End class SigProxy gobject.signal_new("lists-ready", SigProxy, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,gobject.TYPE_PYOBJECT)) # We use these classes to emulate a Status object when we need # one to be built out of something else. class Status(): def __init__(self): self.user = User() self.id = None self.created_at = None self.in_reply_to_screen_name = None self.in_reply_to_status_id = None class User(): def __init__(self): self.screen_name = None self.name = None # To keep things simple elsewhere and improve code reuse # we'll build a list of home-cooked Status objects out of results. # Why is this even necessary? # Why can't we have more consistency out of the Twitter API? def results_to_statuses(results): statuses = [] for result in results.results: status = Status() status.id = result.id status.user = User() status.user.screen_name = result.from_user status.user.name = "" status.in_reply_to_screen_name = result.to_user # The Twitter Search API has different timestamps than the # REST API... balls # fixme: # Gotta be a cleaner way to do this, but I can't think of it # right now created_at = re.sub(',', '', result.created_at) created_split = re.split(' ', created_at) status.created_at = created_split[0] + ' ' + created_split[2] + ' ' + created_split[1] + ' ' + created_split[4] + ' ' + created_split[5] + ' ' + created_split[3] status.text = result.text statuses.append(status) return statuses def dms_to_statuses(direct_messages): statuses = [] for dm in direct_messages: status = Status() status.id = dm.id status.user = User() status.user.screen_name = dm.sender_screen_name status.user.name = dm.sender.name status.created_at = dm.created_at status.text = dm.text statuses.append(status) return statuses def add_to_cache(status): with AvCache().lock: hit = AvCache().map.has_key(status.user.screen_name) if not hit: try: url = urlopen(status.user.profile.image_url) data = url.read() loader = gtk.gdk.PixbufLoader() loader.write(str(data)) image = loader.get_pixbuf() loader.close() with AvCache().lock: AvCache().map[status.user.screen_name] = image except URLError: pass # Nothing needs be done, just catch & ignore