#!/usr/bin/python # # Custom twitter client... mostly for learning Python import sys, ConfigParser, os, re, optparse, shelve import twitter import gtk, gtk.glade, gobject from urllib2 import HTTPError from twitterwidgets import TweetPane class MyTwitter(): """ Display Tweets, post to twitter """ def __init__(self, config_file): config = ConfigParser.ConfigParser() config.read(os.path.expanduser(config_file)) self.accounts = {} for item in config.sections(): if (re.match(r'account-', item)): username = config.get(item, 'username') self.accounts[username] = twitter.Api(username=username, password=config.get(item, 'password')) self.username = self.accounts.keys()[0] self.api = self.accounts[self.username] self.num_entries = int(config.get('global', 'entries')) self.refresh_time = int(config.get('global', 'refreshtime')) db_file = os.path.expanduser(config.get('global', 'dbfile')) self.db = shelve.open(db_file) if not self.db.has_key('open_tabs'): self.db['open_tabs'] = ['Home', '@' + self.username, 'Direct Messages'] if self.refresh_time < 10: self.refresh_time = 10 self.reply_id = None # Load up all the GUI stuff self.init_user_interface('./default.glade') self.init_widgets() def init_user_interface(self, path_to_skin): self.widget_tree=gtk.glade.XML(path_to_skin, "window") self.widget_tree.signal_autoconnect(self) def init_widgets(self): # Get widgets from glade self.tweet_notebook = self.widget_tree.get_widget('tweet_notebook') self.view_menu = self.widget_tree.get_widget('view_menu') self.update_entry = self.widget_tree.get_widget('update_entry') self.update_count = self.widget_tree.get_widget('update_count') self.status_bar = self.widget_tree.get_widget('status_bar') self.search_entry = self.widget_tree.get_widget('search_entry') self.following_button = self.widget_tree.get_widget('following_button') self.at_button = self.widget_tree.get_widget('at_button') self.verified_label = self.widget_tree.get_widget('verified_label') self.account_select = self.widget_tree.get_widget('account_select') self.context_id = self.status_bar.get_context_id('message') self.tweet_notebook.remove_page(0) # kill the page that glade makes us have self.account_select.remove_text(0) # Add entries to the account select box, set the default entry for username in self.accounts.keys(): self.account_select.append_text(username) self.account_select.set_active(0) # When we change tabs, any unread tweets in it become read self.tweet_notebook.connect('switch_page', self.on_tab_change) # Add the tabs from last session to the notebook for tab in self.db['open_tabs']: self.add_to_notebook(tab) # Put Home, @user, Direct Messages, and lists in the View menu for # each user for username in self.accounts.keys(): outer_menu_item = gtk.MenuItem(username) self.view_menu.append(outer_menu_item) new_menu = gtk.Menu() outer_menu_item.set_submenu(new_menu) lists = self.accounts[username].GetUserLists() list_names = [] for l in lists['lists']: list_names.append(l.name) list_names.sort() list_names.insert(0, 'Home') list_names.insert(1, '@' + username) list_names.insert(2, 'Direct Messages') for l in list_names: menu_item = gtk.MenuItem(l) new_menu.append(menu_item) menu_item.connect('activate', self.on_view_selected, username, l) menu_item.show() outer_menu_item.show() # Timer to update periodically gobject.timeout_add(self.refresh_time * 1000, self.update_windows) def update_windows(self): for i in range(0, self.tweet_notebook.get_n_pages()): pane = self.tweet_notebook.get_nth_page(i) list_name = pane.get_list_name() # Single tweets should never be updated here if pane.get_single_tweet() is not None: continue # username/Home entries need to load the appropriate Home feed elif re.search(r'/Home', list_name): account = self.accounts[re.sub(r'/Home', r'', list_name)] statuses = account.GetHomeTimeline(count=self.num_entries) # For @username, we need to check if it is one of our usernames, or # just needs to be searched on elif re.match('@', list_name): if self.accounts.has_key(re.sub('@', '', list_name)): account = self.accounts[re.sub(r'@', r'', list_name)] statuses = account.GetMentions(count=self.num_entries) else: statuses = self.results_to_statuses(self.api.Search(list_name, rpp=self.num_entries)) # Direct Messages should match like /Home, above elif re.search(r'/Direct Messages', list_name): account = self.accounts[re.sub(r'/Direct Messages', r'', list_name)] statuses = self.dms_to_statuses(account.GetDirectMessages()) # User lookups go straight to the user elif re.match(r'user: ', list_name): statuses = self.api.GetUserTimeline(re.sub(r'^user: ', r'', list_name), count=self.num_entries) # Lists load the appropriate list from the appropriate account elif re.match(r'list: ', list_name): username = re.sub(r'list: (.*)/.*', r'\1', list_name) real_list = re.sub(r'list: .*/(.*)', r'\1', real_list) account = self.accounts[username] statuses = account.GetListStatuses(real_list, per_page=self.num_entries) # Everything else is a straight search else: statuses = self.results_to_statuses(self.api.Search(list_name, rpp=self.num_entries)) pane.update_window(statuses) # We have to return true, so the timeout_add event will keep happening return True def update_windows_callback(self, widget): self.update_windows() def update_status(self): text = self.update_entry.get_text() self.update_entry.set_text("") self.api.PostUpdate(text, in_reply_to_status_id=self.reply_id) self.reply_id = None self.update_status_bar('Tweet Posted') def update_status_callback(self, widget): self.update_status() def text_watcher(self, widget): ''' Watch text entered on the update_entry, update things ''' text_len = self.update_entry.get_text_length() new_count = str(text_len) + "/140" self.update_count.set_label(new_count) # If reply_id is set, unset it if we have removed the @ symbol if self.reply_id is not None and not re.match('@', self.update_entry.get_text()): self.reply_id = None def gtk_main_quit(self, widget): self.db.close() gtk.main_quit() def on_about(self, widget): print "STUB: help->about not yet implemented" def on_reply(self, widget, data): self.update_entry.set_text('@' + data['screen_name'] + ' ') self.reply_id = data['id'] self.update_entry.grab_focus() def on_retweet(self, widget, data): self.api.PostRetweet(data['id']) def on_reply_to(self, widget, data): self.add_to_notebook(data['name'], data['id']) def on_view_selected(self, event, username, name): if name == 'Home' or name == 'Direct Messages': full_name = username + '/' + name elif name == '@' + username: full_name = name else: full_name = 'list: ' + name # Now, add a new tab with this list self.add_to_notebook(full_name) # Remove one of the views from the tweet notebook. # Called when the close button is clicked on one of the views # or Ctrl + W is pressed while the view is active def remove_view(self, name): ot = self.db['open_tabs'] ot.remove(name) self.db['open_tabs'] = ot for i in range(self.tweet_notebook.get_n_pages()): pane = self.tweet_notebook.get_nth_page(i) if (pane.get_list_name() == name): self.tweet_notebook.remove_page(i) return def remove_view_callback(self, event, name): self.remove_view(name) def add_to_notebook(self, name, single_tweet=None): # If it already exists, don't add it, just switch to it for i in range(self.tweet_notebook.get_n_pages()): pane = self.tweet_notebook.get_nth_page(i) # Unless it is a single tweet... ignore those unless # we are also a single tweet... then, special logic if pane.get_single_tweet() is not None: if pane.get_single_tweet() == single_tweet: self.tweet_notebook.set_current_page(i) return elif pane.get_list_name() == name: self.tweet_notebook.set_current_page(i) return # We check for the name so that the special case of # the first run is handled... if single_tweet is None and name not in self.db['open_tabs']: ot = self.db['open_tabs'] ot.append(name) self.db['open_tabs'] = ot is_user = False following = False verified = False if re.match('user:', name): is_user = True following = self.check_following(name) verified = self.check_verified(name) new_pane = TweetPane(name, num_entries=self.num_entries, single_tweet=single_tweet, is_user=is_user, following=following, verified=verified) self.tweet_notebook.append_page_menu(new_pane, new_pane.get_tab_label(), gtk.Label(name)) new_pane.get_tab_label().connect('close-clicked', self.remove_view_callback, name) new_pane.connect('tweet-reply', self.on_reply) new_pane.connect('tweet-retweet', self.on_retweet) new_pane.connect('tweet-in-reply-to', self.on_reply_to) new_pane.connect('show-user', self.show_user_callback) # Special logic for single tweet pane if single_tweet is not None: try: statuses = [] statuses.append(self.api.GetStatus(single_tweet)) new_pane.update_window(statuses) except HTTPError: self.tweet_notebook.remove_page(-1) self.update_status_bar('Error retrieving tweet id: ' + str(single_tweet)) return self.update_windows() self.tweet_notebook.set_current_page(-1) # switch to the new pane def on_tab_change(self, event, page, page_num): pane = self.tweet_notebook.get_nth_page(page_num) pane.set_tweets_read() self.update_follow_button(pane) if pane.get_is_user(): self.at_button.show() else: self.at_button.hide() if pane.get_verified(): self.verified_label.show() else: self.verified_label.hide() def on_search(self, event): search_string = self.search_entry.get_text() self.search_entry.set_text('') self.add_to_notebook(search_string) def update_status_bar(self, text): self.status_bar.pop(self.context_id) self.status_bar.push(self.context_id, text) def on_following_button_clicked(self, event): current_pane = self.tweet_notebook.get_nth_page(self.tweet_notebook.get_current_page()) user_name = re.sub('^user: ', '', current_pane.get_list_name()) if current_pane.get_following(): self.api.DestroyFriendship(user_name) current_pane.set_following(self.check_following(user_name)) else: self.api.CreateFriendship(user_name) current_pane.set_following(self.check_following(user_name)) self.update_follow_button(current_pane) # Name is the name of a pane, with the 'user: ' in place def check_following(self, name): screen_name = re.sub('user: ', '', name) relationship = self.api.ShowFriendships(target_screen_name=screen_name) return relationship.source.following # Name is the name of a pane, with the 'user: ' in place def check_verified(self, name): screen_name = re.sub('user: ', '', name) user = self.api.GetUser(screen_name) return user.verified def update_follow_button(self, pane): if not pane.get_is_user(): self.following_button.set_label('') self.following_button.hide() elif pane.get_following(): self.following_button.set_label('Unfollow') self.following_button.show() else: self.following_button.set_label('Follow') self.following_button.show() def show_user(self, name): self.add_to_notebook('user: ' + name) def show_user_callback(self, widget, data): self.show_user(data) def on_at_button_clicked(self, widget): current_pane = self.tweet_notebook.get_nth_page(self.tweet_notebook.get_current_page()) user_name = re.sub('^user: ', '', current_pane.get_list_name()) self.add_to_notebook('@' + user_name) def global_key_press_handler(self, widget, event): keyname = gtk.gdk.keyval_name(event.keyval) if keyname == 'w' and event.state & gtk.gdk.CONTROL_MASK: self.close_current_tab() def close_current_tab(self): current_pane = self.tweet_notebook.get_nth_page(self.tweet_notebook.get_current_page()) self.remove_view(current_pane.get_list_name()) def on_account_changed(self, widget): new_user = self.account_select.get_active_text() if self.accounts.has_key(new_user): self.username = new_user self.api = self.accounts[self.username] # To keep things simple elsewhere and improve code reuse # we'll build a list of home-cooked Status objects out of results. # Why is this even necessary? # Why can't we have more consistency out of the Twitter API? def results_to_statuses(self, results): statuses = [] for result in results.results: status = Status() status.id = result.id status.user = User() status.user.screen_name = result.from_user status.user.name = "" status.in_reply_to_screen_name = result.to_user # The Twitter Search API has different timestamps than the # REST API... balls # fixme: # Gotta be a cleaner way to do this, but I can't think of it # right now created_at = re.sub(',', '', result.created_at) created_split = re.split(' ', created_at) status.created_at = created_split[0] + ' ' + created_split[2] + ' ' + created_split[1] + ' ' + created_split[4] + ' ' + created_split[5] + ' ' + created_split[3] status.text = result.text statuses.append(status) return statuses def dms_to_statuses(self, direct_messages): statuses = [] for dm in direct_messages: status = Status() status.id = dm.id status.user = User() status.user.screen_name = dm.sender.screen_name status.user.name = dm.sender.name status.created_at = dm.created_at status.text = dm.text statuses.append(status) return statuses ### end class MyTwitter # We use these classes to emulate a Status object when we need # one to be built out of something else. class Status(): def __init__(self): self.user = User() self.id = None self.created_at = None self.in_reply_to_screen_name = None self.in_reply_to_status_id = None class User(): def __init__(self): self.screen_name = None self.name = None # main parser = optparse.OptionParser() parser.add_option('-c' ,'--config', dest="filename", default="~/.mytwitter") (options, args) = parser.parse_args() my_twitter = MyTwitter(options.filename) gtk.main()