#!/usr/bin/env python ## # Wilfredo Sanchez, wsanchez@wsanchez.net # # iTunes Music Store RSS Feed parser # http://www.wsanchez.net/itms/ ## # Copyright (c) 2002 Wilfredo Sanchez Vega. # All rights reserved. # # Permission to use, copy, modify, and distribute this software for # any purpose with or without fee is hereby granted, provided that the # above copyright notice and this permission notice appear in all # copies. # # THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHORS DISCLAIM ALL # WARRANTIES WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE # AUTHORS BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION # WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ## __version__ = "1.0" import types import time import feedparser query_new_releases = "newreleases" query_just_added = "justadded" query_top_songs = "topsongs" query_top_albums = "topalbums" query_featured_albums = "featuredalbums" query_types = (query_new_releases, query_just_added, query_top_songs, query_top_albums, query_featured_albums) def get(limit=None, query_type=None): """ Get data from the iTunes Music Store RSS feed. The limit option, if given, specifies the maximum number of items to fetch from the feed. The limit must be a positive integer. If unspecified or None, the limit defaults to 10. The query_type option, if given, specifies the query time, which may be one of the constants in query_types. If unspecified or None, the query_type defaults to query_new_releases. Returns a dictionary with the following keys: url, URL for the feed; title, Feed title; description, Feed description; copyright, Feed copyright; time-to-live, Time to live for this data; date, Generation date for this data (time.struct_time object); items, List of data items. Data items are dictionaries with the following keys: """ if not limit : limit = 10 if not query_type: query_type = query_new_releases if type(limit) != types.IntType or limit < 1: raise ValueError("limit '" + str(limit) + "' is not a positive integer") if query_type not in query_types: raise ValueError("query_type '" + str(query_type) + "' is not one of " + query_types) data = {} feed_server = "ax.phobos.apple.com.edgesuite.net" feed_direct_action = "WebObjects/MZStore.woa/wpa/MRSS/" + query_type feed_url = ("http://" + feed_server + "/" + feed_direct_action + "/limit=" + str(limit) + "/rss.xml") feed = feedparser.parse(feed_url) if feed["status"] != 200: # FIXME: All 2xx codes are probably fine raise HTTPError("Unexpected HTTP response code (" + feed["status"] + ")") if feed["encoding"] != "utf-8": # FIXME: convert to UTF-8 rather than whine raise UnicodeError("Response encoding is not UTF-8 (" + feed["encoding"] + ")") items = [] for feed_item in feed["items"]: item = {} genres = [] for genre in feed_item["categories"]: genres.append({ "url":genre[0], "name":genre[1] }) item["artist_name" ] = feed_item["itms_artist" ] item["artist_url" ] = feed_item["itms_artistlink" ] item["album_title" ] = feed_item["itms_album" ] item["album_url" ] = feed_item["itms_albumlink" ] item["album_price" ] = feed_item["itms_albumprice" ] item["album_copyright" ] = feed_item["itms_rights" ] item["album_release_date"] = feed_item["itms_releasedate"] item["album_art_url" ] = feed_item["itms_coverart" ] item["genres" ] = tuple(genres) items.append(item) data["url" ] = feed["url"] data["date" ] = feed["modified"] data["title" ] = feed["channel"]["title" ] data["description" ] = feed["channel"]["description"] data["copyright" ] = feed["channel"]["rights" ] data["time-to-live"] = feed["channel"]["ttl" ] data["items" ] = tuple(items) return data class HTTPError(OSError): pass def generate_response_page(itms_data, stylesheet = None, title = None, description = None): """ Generate a full response page. """ if not title: title = itms_data["title"] print 'Content-type: text/html; charset=UTF-8' print '' print '' print '' print '' print ' ' + title + '' print ' ' if stylesheet: print ' ' print '' print '' print ' ' generate_html_include(itms_data, title = title, description = description) print ' ' print '' def generate_html_include(itms_data, title = None, description = None): """ Generate an SSI include snippet. """ if not title : title = itms_data["title" ] if not description: description = itms_data["description"] print '
' print '

' + title + '

' print '

' + description + '

' print '
    ' c = 0 for item in itms_data["items"]: c = c + 1 if c == 1: first = 'id="itms-first-item"' else: first = '' print '
  1. ' print ' ' print ' ' print ' ' print '
    on ' + item["album_release_date"] + '
    ' print '
    for ' + item["album_price" ] + '
    ' print '
  2. ' print '
' print '
' + time.strftime("%a, %d %b %Y %H:%M:%S %Z", itms_data["date"]) + '
' print ' ' print '
' def run_cgi(): """ Run as a CGI program. """ import cgi import cgitb; cgitb.enable() # State errors = [] # Process form input form = cgi.FieldStorage() limit = None format = None stylesheet = None title = None description = None query_type = None if form.has_key("limit"): try: limit = int(form["limit"].value) except ValueError: errors.append("Invalid size limit: " + form["limit"].value) if not len(errors): if form.has_key("format" ): format = form["format" ].value if form.has_key("stylesheet" ): stylesheet = form["stylesheet" ].value if form.has_key("title" ): title = form["title" ].value if form.has_key("description"): description = form["description"].value if form.has_key("query" ): query_type = form["query" ].value if query_type and query_type not in query_types: errors.append("Invalid query type: " + query_type) if not len(errors): try: # Get data itms_data = get(limit=limit, query_type=query_type) # Generate response if not format or format == "page": generate_response_page(itms_data, stylesheet = stylesheet, title = title, description = description) elif format == "include": generate_html_include(itms_data, title = title, description = description) else: errors.append("Unknown format: " + format) except Exception, e: errors.append(e) if len(errors): print 'Content-type: text/plain; charset=UTF-8' print '' for error in errors: print str(error) ## # Run ## if __name__ == '__main__': import os if os.environ.has_key("GATEWAY_INTERFACE") and os.environ["GATEWAY_INTERFACE"][:4] == "CGI/": run_cgi()