# -*- coding: utf-8 -*- ### # Copyright (c) 2015, butterscotchstallion # All rights reserved. # # ### import supybot.utils as utils from supybot.commands import * import supybot.plugins as plugins import supybot.ircmsgs as ircmsgs import supybot.ircutils as ircutils import supybot.callbacks as callbacks import re import requests try: from urlparse import urlparse from urllib import urlencode except ImportError: from urllib.parse import urlencode, urlparse from bs4 import BeautifulSoup import random import json import cgi import datetime from jinja2 import Template from datetime import timedelta import timeout_decorator import unicodedata try: from supybot.i18n import PluginInternationalization _ = PluginInternationalization("SpiffyTitles") except ImportError: # Placeholder that allows to run the plugin on a bot # without the i18n module _ = lambda x: x class SpiffyTitles(callbacks.Plugin): """Displays link titles when posted in a channel""" threaded = True callBefore = ["Web"] link_cache = [] handlers = {} wall_clock_timeout = 8 max_request_retries = 3 imgur_client = None def __init__(self, irc): self.__parent = super(SpiffyTitles, self) self.__parent.__init__(irc) self.wall_clock_timeout = self.registryValue("wallClockTimeoutInSeconds") self.default_handler_enabled = self.registryValue("defaultHandlerEnabled") self.add_handlers() def add_handlers(self): """ Adds all handlers """ self.add_youtube_handlers() self.add_imdb_handlers() self.add_imgur_handlers() self.add_coub_handlers() self.add_vimeo_handlers() self.add_dailymotion_handlers() def add_dailymotion_handlers(self): self.handlers["www.dailymotion.com"] = self.handler_dailymotion def add_vimeo_handlers(self): self.handlers["vimeo.com"] = self.handler_vimeo def add_coub_handlers(self): self.handlers["coub.com"] = self.handler_coub def handler_dailymotion(self, url, info): """ Handles dailymotion links """ dailymotion_handler_enabled = self.registryValue("dailymotionHandlerEnabled") self.log.info("SpiffyTitles: calling dailymotion handler for %s" % url) title = None video_id = None """ Get video ID """ if dailymotion_handler_enabled and "/video/" in info.path: video_id = info.path.lstrip("/video/").split("_")[0] self.log.info("SpiffyTitles: %s" % str(info)) if video_id is not None: api_url = "https://api.dailymotion.com/video/%s?fields=id,title,owner.screenname,duration,views_total" % video_id self.log.info("SpiffyTitles: looking up dailymotion info: %s", api_url) agent = self.get_user_agent() headers = { "User-Agent": agent } request = requests.get(api_url, headers=headers) ok = request.status_code == requests.codes.ok if ok: response = json.loads(request.text) if response is not None and "title" in response: video = response dailymotion_template = Template(self.registryValue("dailymotionVideoTitleTemplate")) video["views_total"] = "{:,}".format(int(video["views_total"])) video["duration"] = self.get_duration_from_seconds(video["duration"]) video["ownerscreenname"] = video["owner.screenname"] title = dailymotion_template.render(video) else: self.log.info("SpiffyTitles: received unexpected payload from video: %s" % api_url) else: self.log.error("SpiffyTitles: dailymotion handler returned %s: %s" % (request.status_code, request.text[:200])) if title is None: self.log.info("SpiffyTitles: could not get dailymotion info for %s" % url) return self.handler_default(url) else: return title def handler_vimeo(self, url, domain): """ Handles Vimeo links """ vimeo_handler_enabled = self.registryValue("vimeoHandlerEnabled") self.log.info("SpiffyTitles: calling vimeo handler for %s" % url) title = None video_id = None """ Get video ID """ if vimeo_handler_enabled: result = re.search(r'^(http(s)://)?(www\.)?(vimeo\.com/)?(\d+)', url) if result is not None: video_id = result.group(5) if video_id is not None: api_url = "https://vimeo.com/api/v2/video/%s.json" % video_id self.log.info("SpiffyTitles: looking up vimeo info: %s", api_url) agent = self.get_user_agent() headers = { "User-Agent": agent } request = requests.get(api_url, headers=headers) ok = request.status_code == requests.codes.ok if ok: response = json.loads(request.text) if response is not None and "title" in response[0]: video = response[0] vimeo_template = Template(self.registryValue("vimeoTitleTemplate")) """ Some videos do not have this information available """ if "stats_number_of_plays" in video: video["stats_number_of_plays"] = "{:,}".format(int(video["stats_number_of_plays"])) else: video["stats_number_of_plays"] = 0 if "stats_number_of_comments" in video: video["stats_number_of_comments"] = "{:,}".format(int(video["stats_number_of_comments"])) else: video["stats_number_of_comments"] = 0 video["duration"] = self.get_duration_from_seconds(video["duration"]) title = vimeo_template.render(video) else: self.log.info("SpiffyTitles: received unexpected payload from video: %s" % api_url) else: self.log.error("SpiffyTitles: vimeo handler returned %s: %s" % (request.status_code, request.text[:200])) if title is None: self.log.info("SpiffyTitles: could not get vimeo info for %s" % url) return self.handler_default(url) else: return title def handler_coub(self, url, domain): """ Handles coub.com links """ coub_handler_enabled = self.registryValue("coubHandlerEnabled") self.log.info("SpiffyTitles: calling coub handler for %s" % url) title = None """ Get video ID """ if coub_handler_enabled and "/view/" in url: video_id = url.split("/view/")[1] """ Remove any query strings """ if "?" in video_id: video_id = video_id.split("?")[0] api_url = "http://coub.com/api/v2/coubs/%s" % video_id agent = self.get_user_agent() headers = { "User-Agent": agent } request = requests.get(api_url, headers=headers) ok = request.status_code == requests.codes.ok if ok: response = json.loads(request.text) if response: video = response coub_template = Template(self.registryValue("coubTemplate")) video["likes_count"] = "{:,}".format(int(video["likes_count"])) video["recoubs_count"] = "{:,}".format(int(video["recoubs_count"])) video["views_count"] = "{:,}".format(int(video["views_count"])) title = coub_template.render(video) else: self.log.error("SpiffyTitles: coub handler returned %s: %s" % (request.status_code, request.text[:200])) if title is None: if coub_handler_enabled: self.log.info("SpiffyTitles: %s does not appear to be a video link!" % url) return self.handler_default(url) else: return title def add_imgur_handlers(self): # Images mostly self.handlers["i.imgur.com"] = self.handler_imgur_image # Albums, galleries, etc self.handlers["imgur.com"] = self.handler_imgur def initialize_imgur_client(self): """ Check if imgur client id or secret are set, and if so initialize imgur API client """ if self.imgur_client is None: imgur_client_id = self.registryValue("imgurClientID") imgur_client_secret = self.registryValue("imgurClientSecret") imgur_handler_enabled = self.registryValue("imgurHandlerEnabled") if imgur_handler_enabled and imgur_client_id and imgur_client_secret: self.log.info("SpiffyTitles: enabling imgur handler") # Initialize API client try: from imgurpython import ImgurClient from imgurpython.helpers.error import ImgurClientError try: self.imgur_client = ImgurClient(imgur_client_id, imgur_client_secret) except ImgurClientError as e: self.log.error("SpiffyTitles: imgur client error: %s" % (e.error_message)) except ImportError as e: self.log.error("SpiffyTitles ImportError: %s" % str(e)) def doPrivmsg(self, irc, msg): """ Observe each channel message and look for links """ ignore_actions = self.registryValue("ignoreActionLinks") channel = msg.args[0].lower() is_channel = irc.isChannel(channel) is_ctcp = ircmsgs.isCtcp(msg) message = msg.args[1] title = None bot_nick = irc.nick origin_nick = msg.nick is_message_from_self = origin_nick.lower() == bot_nick.lower() """ Configuration option determines whether we should ignore links that appear within an action """ if is_ctcp and ignore_actions: return if is_channel and not is_message_from_self: channel_is_allowed = self.is_channel_allowed(channel) url = self.get_url_from_message(message) ignore_match = self.message_matches_ignore_pattern(message) if ignore_match: self.log.info("SpiffyTitles: ignoring message due to linkMessagePattern match") return if url: # Check if channel is allowed based on white/black list restrictions if not channel_is_allowed: self.log.info("SpiffyTitles: not responding to link in %s due to black/white list restrictions" % (channel)) return info = urlparse(url) domain = info.netloc is_ignored = self.is_ignored_domain(domain) if is_ignored: self.log.info("SpiffyTitles: URL ignored due to domain blacklist match: %s" % url) return is_whitelisted_domain = self.is_whitelisted_domain(domain) if self.registryValue("whitelistDomainPattern") and not is_whitelisted_domain: self.log.info("SpiffyTitles: URL ignored due to domain whitelist mismatch: %s" % url) return title = self.get_title_by_url(url) if title is not None and title: self.log.info("SpiffyTitles: title found: %s" % (title)) ignore_match = self.title_matches_ignore_pattern(title) if ignore_match: self.log.info("SpiffyTitles: ignoring title due to ignoredTitlePattern match") return else: irc.sendMsg(ircmsgs.privmsg(channel, title)) else: if self.default_handler_enabled: self.log.debug("SpiffyTitles: could not get a title for %s" % (url)) else: self.log.debug("SpiffyTitles: could not get a title for %s but default handler is disabled" % (url)) def get_title_by_url(self, url): """ Retrieves the title of a website based on the URL provided """ info = urlparse(url) domain = info.netloc title = None """ Check if we have this link cached according to the cache lifetime. If so, serve link from the cache instead of calling handlers. """ cached_link = self.get_link_from_cache(url) if cached_link is not None: title = cached_link["title"] else: if domain in self.handlers: handler = self.handlers[domain] title = handler(url, info) else: if self.default_handler_enabled: title = self.handler_default(url) if title is not None: title = self.get_formatted_title(title) # Update link cache self.log.info("SpiffyTitles: caching %s" % (url)) now = datetime.datetime.now() self.link_cache.append({ "url": url, "timestamp": now, "title": title }) return title def t(self, irc, msg, args, query): """ Retrieves title for a URL on demand """ message = msg.args[1] channel = msg.args[0] url = self.get_url_from_message(message) title = None error_message = self.registryValue("onDemandTitleError") try: if url: title = self.get_title_by_url(query) except: pass if title is not None and title: irc.sendMsg(ircmsgs.privmsg(channel, title)) else: irc.sendMsg(ircmsgs.privmsg(channel, error_message)) t = wrap(t, ['text']) def get_link_from_cache(self, url): """ Looks for a URL in the link cache and returns info about if it's not stale according to the configured cache lifetime, or None. If linkCacheLifetimeInSeconds is 0, then cache is disabled and we can immediately return """ cache_lifetime_in_seconds = int(self.registryValue("linkCacheLifetimeInSeconds")) if cache_lifetime_in_seconds == 0: return # No cache yet if len(self.link_cache) == 0: return cached_link = None now = datetime.datetime.now() stale = False seconds = 0 for link in self.link_cache: if link["url"] == url: cached_link = link break # Found link, check timestamp if cached_link is not None: seconds = (now - cached_link["timestamp"]).total_seconds() stale = seconds >= cache_lifetime_in_seconds if stale: self.log.info("SpiffyTitles: %s was sent %s seconds ago" % (url, seconds)) else: self.log.info("SpiffyTitles: serving link from cache: %s" % (url)) return cached_link def add_imdb_handlers(self): """ Enables meta info about IMDB links through the OMDB API """ self.handlers["www.imdb.com"] = self.handler_imdb self.handlers["imdb.com"] = self.handler_imdb def add_youtube_handlers(self): """ Adds handlers for Youtube videos. The handler is matched based on the domain used in the URL. """ self.handlers["youtube.com"] = self.handler_youtube self.handlers["www.youtube.com"] = self.handler_youtube self.handlers["youtu.be"] = self.handler_youtube self.handlers["m.youtube.com"] = self.handler_youtube def is_channel_allowed(self, channel): """ Checks channel whitelist and blacklist to determine if the current channel is allowed to display titles. """ channel = channel.lower() is_allowed = False white_list = self.filter_empty(self.registryValue("channelWhitelist")) black_list = self.filter_empty(self.registryValue("channelBlacklist")) white_list_empty = len(white_list) == 0 black_list_empty = len(black_list) == 0 # Most basic case, which is that both white and blacklist are empty. Any channel is allowed. if white_list_empty and black_list_empty: is_allowed = True # If there is a white list, blacklist is ignored. if white_list: is_allowed = channel in white_list # Finally, check blacklist if not white_list and black_list: is_allowed = channel not in black_list return is_allowed def filter_empty(self, input): """ Remove all empty strings from a list """ return set([channel for channel in input if len(channel.strip())]) def is_ignored_domain(self, domain): """ Checks domain against a regular expression """ pattern = self.registryValue("ignoredDomainPattern") if pattern: self.log.debug("SpiffyTitles: matching %s against %s" % (domain, str(pattern))) try: pattern_search_result = re.search(pattern, domain) if pattern_search_result is not None: match = pattern_search_result.group() return match except re.Error: self.log.error("SpiffyTitles: invalid regular expression: %s" % (pattern)) def is_whitelisted_domain(self, domain): """ Checks domain against a regular expression """ pattern = self.registryValue("whitelistDomainPattern") if pattern: self.log.debug("SpiffyTitles: matching %s against %s" % (domain, str(pattern))) try: pattern_search_result = re.search(pattern, domain) if pattern_search_result is not None: match = pattern_search_result.group() return match except re.Error: self.log.error("SpiffyTitles: invalid regular expression: %s" % (pattern)) def get_video_id_from_url(self, url, info): """ Get YouTube video ID from URL """ try: path = info.path domain = info.netloc video_id = "" if domain == "youtu.be": video_id = path.split("/")[1] else: parsed = cgi.parse_qsl(info.query) params = dict(parsed) if "v" in params: video_id = params["v"] if video_id: return video_id else: self.log.error("SpiffyTitles: error getting video id from %s" % (url)) except IndexError as e: self.log.error("SpiffyTitles: error getting video id from %s (%s)" % (url, str(e))) def handler_youtube(self, url, domain): """ Uses the Youtube API to provide additional meta data about Youtube Video links posted. """ youtube_handler_enabled = self.registryValue("youtubeHandlerEnabled") developer_key = self.registryValue("youtubeDeveloperKey") if not youtube_handler_enabled: return None if not developer_key: self.log.info("SpiffyTitles: no Youtube developer key set! Check the documentation for instructions.") return None self.log.info("SpiffyTitles: calling Youtube handler for %s" % (url)) video_id = self.get_video_id_from_url(url, domain) yt_template = Template(self.registryValue("youtubeTitleTemplate")) title = "" if video_id: options = { "part": "snippet,statistics,contentDetails", "maxResults": 1, "key": developer_key, "id": video_id } encoded_options = urlencode(options) api_url = "https://www.googleapis.com/youtube/v3/videos?%s" % (encoded_options) agent = self.get_user_agent() headers = { "User-Agent": agent } self.log.info("SpiffyTitles: requesting %s" % (api_url)) request = requests.get(api_url, headers=headers) ok = request.status_code == requests.codes.ok if ok: response = json.loads(request.text) if response: try: items = response["items"] video = items[0] snippet = video["snippet"] title = snippet["title"] statistics = video["statistics"] view_count = 0 like_count = 0 dislike_count = 0 comment_count = 0 favorite_count = 0 if "viewCount" in statistics: view_count = "{:,}".format(int(statistics["viewCount"])) if "likeCount" in statistics: like_count = "{:,}".format(int(statistics["likeCount"])) if "dislikeCount" in statistics: dislike_count = "{:,}".format(int(statistics["dislikeCount"])) if "favoriteCount" in statistics: favorite_count = "{:,}".format(int(statistics["favoriteCount"])) if "commentCount" in statistics: comment_count = "{:,}".format(int(statistics["commentCount"])) channel_title = snippet["channelTitle"] duration_seconds = self.get_total_seconds_from_duration(video["contentDetails"]["duration"]) """ #23 - If duration is zero, then it"s a LIVE video """ if duration_seconds > 0: duration = self.get_duration_from_seconds(duration_seconds) else: duration = "LIVE" yt_logo = self.get_youtube_logo() compiled_template = yt_template.render({ "title": title, "duration": duration, "view_count": view_count, "like_count": like_count, "dislike_count": dislike_count, "comment_count": comment_count, "favorite_count": favorite_count, "channel_title": channel_title, "yt_logo": yt_logo }) title = compiled_template except IndexError as e: self.log.error("SpiffyTitles: IndexError parsing Youtube API JSON response: %s" % (str(e))) else: self.log.error("SpiffyTitles: Error parsing Youtube API JSON response") else: self.log.error("SpiffyTitles: Youtube API HTTP %s: %s" % (request.status_code, request.text)) # If we found a title, return that. otherwise, use default handler if title: return title else: self.log.info("SpiffyTitles: falling back to default handler") return self.handler_default(url) def get_duration_from_seconds(self, duration_seconds): m, s = divmod(duration_seconds, 60) h, m = divmod(m, 60) duration = "%02d:%02d" % (m, s) """ Only include hour if the video is at least 1 hour long """ if h > 0: duration = "%02d:%s" % (h, duration) return duration def get_youtube_logo(self): colored_letters = [ "%s" % ircutils.mircColor("You", fg="red", bg="white"), "%s" % ircutils.mircColor("Tube", fg="white", bg="red") ] yt_logo = "".join(colored_letters) return yt_logo def get_total_seconds_from_duration(self, input): """ Duration comes in a format like this: PT4M41S which translates to 4 minutes and 41 seconds. This method returns the total seconds so that the duration can be parsed as usual. """ pattern = regex = re.compile('(?P-?)P(?:(?P\d+)Y)?(?:(?P\d+)M)?(?:(?P\d+)D)?(?:T(?:(?P\d+)H)?(?:(?P\d+)M)?(?:(?P\d+)S)?)?') duration = regex.match(input).groupdict(0) delta = timedelta(hours=int(duration['hours']), minutes=int(duration['minutes']), seconds=int(duration['seconds'])) return delta.total_seconds() def handler_default(self, url): """ Default handler for websites """ default_handler_enabled = self.registryValue("defaultHandlerEnabled") if default_handler_enabled: self.log.info("SpiffyTitles: calling default handler for %s" % (url)) default_template = Template(self.registryValue("defaultTitleTemplate")) html = self.get_source_by_url(url) if html is not None and html: title = self.get_title_from_html(html) if title is not None: title_template = default_template.render(title=title) return title_template else: self.log.info("SpiffyTitles: default handler fired but doing nothing because disabled") def handler_imdb(self, url, info): """ Handles imdb.com links, querying the OMDB API for additional info Typical IMDB URL: http://www.imdb.com/title/tt2467372/ """ headers = self.get_headers() result = None if not self.registryValue("imdbHandlerEnabled"): self.log.info("SpiffyTitles: IMDB handler disabled. Falling back to default handler.") return self.handler_default(url) # Don't care about query strings if "?" in url: url = url.split("?")[0] # We can only accommodate a specific format of URL here if "/title/" in url: imdb_id = url.split("/title/")[1].rstrip("/") omdb_url = "http://www.omdbapi.com/?i=%s&plot=short&r=json" % (imdb_id) try: request = requests.get(omdb_url, timeout=10, headers=headers) if request.status_code == requests.codes.ok: response = json.loads(request.text) result = None imdb_template = Template(self.registryValue("imdbTemplate")) not_found = "Error" in response unknown_error = response["Response"] != "True" if not_found or unknown_error: self.log.info("SpiffyTitles: OMDB error for %s" % (omdb_url)) else: result = imdb_template.render(response) else: self.log.error("SpiffyTitles OMDB API %s - %s" % (request.status_code, request.text)) except requests.exceptions.Timeout as e: self.log.error("SpiffyTitles imdb Timeout: %s" % (str(e))) except requests.exceptions.ConnectionError as e: self.log.error("SpiffyTitles imdb ConnectionError: %s" % (str(e))) except requests.exceptions.HTTPError as e: self.log.error("SpiffyTitles imdb HTTPError: %s" % (str(e))) if result is not None: return result else: self.log.info("SpiffyTitles: IMDB handler failed. calling default handler") return self.handler_default(url) def is_valid_imgur_id(self, input): """ Tests if input matches the typical imgur id, which seems to be alphanumeric. Images, galleries, and albums all share their format in their identifier. """ match = re.match(r"[a-z0-9]+", input, re.IGNORECASE) return match is not None def handler_imgur(self, url, info): """ Queries imgur API for additional information about imgur links. This handler is for any imgur.com domain. """ self.initialize_imgur_client() is_album = info.path.startswith("/a/") is_gallery = info.path.startswith("/gallery/") is_image_page = not is_album and not is_gallery and re.match(r"^\/[a-zA-Z0-9]+", info.path) result = None if is_album: result = self.handler_imgur_album(url, info) #elif is_image_page: # result = self.handler_imgur_image(url, info) else: result = self.handler_default(url) return result def handler_imgur_album(self, url, info): """ Handles retrieving information about albums from the imgur API. imgur provides the following information about albums: https://api.imgur.com/models/album """ from imgurpython.helpers.error import ImgurClientRateLimitError from imgurpython.helpers.error import ImgurClientError self.initialize_imgur_client() if self.imgur_client: album_id = info.path.split("/a/")[1] """ If there is a query string appended, remove it """ if "?" in album_id: album_id = album_id.split("?")[0] if self.is_valid_imgur_id(album_id): self.log.info("SpiffyTitles: found imgur album id %s" % (album_id)) try: album = self.imgur_client.get_album(album_id) if album: imgur_album_template = Template(self.registryValue("imgurAlbumTemplate")) compiled_template = imgur_album_template.render({ "title": album.title, "section": album.section, "view_count": "{:,}".format(album.views), "image_count": "{:,}".format(album.images_count), "nsfw": album.nsfw }) return compiled_template else: self.log.error("SpiffyTitles: imgur album API returned unexpected results!") except ImgurClientRateLimitError as e: self.log.error("SpiffyTitles: imgur rate limit error: %s" % (e.error_message)) except ImgurClientError as e: self.log.error("SpiffyTitles: imgur client error: %s" % (e.error_message)) else: self.log.info("SpiffyTitles: unable to determine album id for %s" % (url)) else: return self.handler_default(url) def handler_imgur_image(self, url, info): """ Handles retrieving information about images from the imgur API. Used for both direct images and imgur.com/some_image_id_here type links, as they're both single images. """ self.initialize_imgur_client() from imgurpython.helpers.error import ImgurClientRateLimitError from imgurpython.helpers.error import ImgurClientError title = None if self.imgur_client: """ If there is a period in the path, it's a direct link to an image. If not, then it's a imgur.com/some_image_id_here type link """ if "." in info.path: path = info.path.lstrip("/") image_id = path.split(".")[0] else: image_id = info.path.lstrip("/") if self.is_valid_imgur_id(image_id): self.log.info("SpiffyTitles: found image id %s" % (image_id)) try: image = self.imgur_client.get_image(image_id) if image: imgur_template = Template(self.registryValue("imgurTemplate")) readable_file_size = self.get_readable_file_size(image.size) compiled_template = imgur_template.render({ "title": image.title, "type": image.type, "nsfw": image.nsfw, "width": image.width, "height": image.height, "view_count": "{:,}".format(image.views), "file_size": readable_file_size, "section": image.section }) title = compiled_template else: self.log.error("SpiffyTitles: imgur API returned unexpected results!") except ImgurClientRateLimitError as e: self.log.error("SpiffyTitles: imgur rate limit error: %s" % (e.error_message)) except ImgurClientError as e: self.log.error("SpiffyTitles: imgur client error: %s" % (e.error_message)) else: self.log.error("SpiffyTitles: error retrieving image id for %s" % (url)) if title is not None: return title else: return self.handler_default(url) def get_readable_file_size(self, num, suffix="B"): """ Returns human readable file size """ for unit in ["","Ki","Mi","Gi","Ti","Pi","Ei","Zi"]: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, "Yi", suffix) def get_formatted_title(self, title): """ Remove cruft from title and apply bold if applicable """ useBold = self.registryValue("useBold") # Replace anywhere in string title = title.replace("\n", "") title = title.replace("\t", "") title = re.sub(" +", " ", title) if useBold: title = ircutils.bold(title) title = title.strip() return title def get_title_from_html(self, html): """ Retrieves value of tag from HTML """ soup = BeautifulSoup(html, "lxml") if soup is not None: """ Some websites have more than one title tag, so get all of them and take the last value """ head = soup.find("head") titles = head.find_all("title") if titles is not None and len(titles): title_text = titles[-1].get_text() if len(title_text): stripped_title = title_text.strip() return stripped_title @timeout_decorator.timeout(wall_clock_timeout) def get_source_by_url(self, url, retries=1): """ Get the HTML of a website based on a URL """ max_retries = self.registryValue("maxRetries") if retries is None: retries = 1 if retries >= max_retries: self.log.info("SpiffyTitles: hit maximum retries for %s" % url) return None self.log.info("SpiffyTitles: attempt #%s for %s" % (retries, url)) try: headers = self.get_headers() self.log.info("SpiffyTitles: requesting %s" % (url)) request = requests.get(url, headers=headers, timeout=10, allow_redirects=True) if request.status_code == requests.codes.ok: # Check the content type which comes in the format: "text/html; charset=UTF-8" content_type = request.headers.get("content-type").split(";")[0].strip() acceptable_types = self.registryValue("mimeTypes") self.log.info("SpiffyTitles: content type %s" % (content_type)) if content_type in acceptable_types: text = request.content if text: return text else: self.log.info("SpiffyTitles: empty content from %s" % (url)) else: self.log.debug("SpiffyTitles: unacceptable mime type %s for url %s" % (content_type, url)) else: self.log.error("SpiffyTitles HTTP response code %s - %s" % (request.status_code, request.content)) except timeout_decorator.TimeoutError: self.log.error("SpiffyTitles: wall timeout!") self.get_source_by_url(url, retries+1) except requests.exceptions.MissingSchema as e: urlWithSchema = "http://%s" % (url) self.log.error("SpiffyTitles missing schema. Retrying with %s" % (urlWithSchema)) return self.get_source_by_url(urlWithSchema) except requests.exceptions.Timeout as e: self.log.error("SpiffyTitles Timeout: %s" % (str(e))) self.get_source_by_url(url, retries+1) except requests.exceptions.ConnectionError as e: self.log.error("SpiffyTitles ConnectionError: %s" % (str(e))) self.get_source_by_url(url, retries+1) except requests.exceptions.HTTPError as e: self.log.error("SpiffyTitles HTTPError: %s" % (str(e))) except requests.exceptions.InvalidURL as e: self.log.error("SpiffyTitles InvalidURL: %s" % (str(e))) def get_headers(self): agent = self.get_user_agent() self.accept_language = self.registryValue("language") headers = { "User-Agent": agent, "Accept-Language": ";".join((self.accept_language, "q=1.0")) } return headers def get_user_agent(self): """ Returns a random user agent from the ones available """ agents = self.registryValue("userAgents") return random.choice(agents) def message_matches_ignore_pattern(self, input): """ Checks message against linkMessageIgnorePattern to determine whether the message should be ignored. """ match = False pattern = self.registryValue("linkMessageIgnorePattern") if pattern: match = re.search(pattern, input) return match def title_matches_ignore_pattern(self, input): """ Checks message against ignoredTitlePattern to determine whether the title should be ignored. """ match = False pattern = self.registryValue("ignoredTitlePattern") if pattern: match = re.search(pattern, input) return match def get_url_from_message(self, input): """ Find the first string that looks like a URL from the message """ url_re = self.registryValue("urlRegularExpression") match = re.search(url_re, input) if match: raw_url = match.group(0).strip() url = self.remove_control_characters(unicode(raw_url)) return url def remove_control_characters(self, s): return "".join(ch for ch in s if unicodedata.category(ch)[0]!="C") Class = SpiffyTitles # vim:set shiftwidth=4 softtabstop=4 expandtab textwidth=79: