Tag Archives: Python

Using OpenCV for great customer service

OpenCV is an Open Source Computer Vision library that can be used in a variety of applications. There are a few wrappers for it that will expose the OpenCV API in a number of languages, but we will look at the Python wrapper in this post.

One application that I was thinking could be done very quickly and easily, would be to use facial recognition to look up a customer before servicing them. This can easily be achieved using a simple cheap webcam mounted at the entrance to a service centre that captures people’s faces as they enter the building. This can then be used to look up against a database of images to identify the customer and all their details immediately on the service centre agent’s terminal. If a customer is a new customer, the agent could then capture the info for next time.

Privacy issues aside, this should be relatively easy to implement.

import sys
import cv2.cv as cv
from optparse import OptionParser

# Parameters for haar detection
# From the API:
# The default parameters (scale_factor=2, min_neighbors=3, flags=0) are tuned
# for accurate yet slow object detection. For a faster operation on real video
# images the settings are:
# scale_factor=1.2, min_neighbors=2, flags=CV_HAAR_DO_CANNY_PRUNING,
# min_size=<minimum possible face size

min_size = (20, 20)
image_scale = 2
haar_scale = 1.2
min_neighbors = 2
haar_flags = 0

def detect_and_draw(img, cascade):
    # allocate temporary images
    gray = cv.CreateImage((img.width,img.height), 8, 1)
    small_img = cv.CreateImage((cv.Round(img.width / image_scale),
                   cv.Round (img.height / image_scale)), 8, 1)

    # convert color input image to grayscale
    cv.CvtColor(img, gray, cv.CV_BGR2GRAY)

    # scale input image for faster processing
    cv.Resize(gray, small_img, cv.CV_INTER_LINEAR)

    cv.EqualizeHist(small_img, small_img)

        t = cv.GetTickCount()
        faces = cv.HaarDetectObjects(small_img, cascade, cv.CreateMemStorage(0),
                                     haar_scale, min_neighbors, haar_flags, min_size)
        t = cv.GetTickCount() - t
        print "detection time = %gms" % (t/(cv.GetTickFrequency()*1000.))
        if faces:
            for ((x, y, w, h), n) in faces:
                # the input to cv.HaarDetectObjects was resized, so scale the
                # bounding box of each face and convert it to two CvPoints
                pt1 = (int(x * image_scale), int(y * image_scale))
                pt2 = (int((x + w) * image_scale), int((y + h) * image_scale))
                cv.Rectangle(img, pt1, pt2, cv.RGB(255, 0, 0), 3, 8, 0)

    cv.ShowImage("result", img)

if __name__ == '__main__':

    parser = OptionParser(usage = "usage: %prog [options] [filename|camera_index]")
    parser.add_option("-c", "--cascade", action="store", dest="cascade", type="str", help="Haar cascade file, default %default", default = "../data/haarcascades/haarcascade_frontalface_alt.xml")
    (options, args) = parser.parse_args()

    cascade = cv.Load(options.cascade)

    if len(args) != 1:

    input_name = args[0]
    if input_name.isdigit():
        capture = cv.CreateCameraCapture(int(input_name))
        capture = None

    cv.NamedWindow("result", 1)

    if capture:
        frame_copy = None
        while True:
            frame = cv.QueryFrame(capture)
            if not frame:
            if not frame_copy:
                frame_copy = cv.CreateImage((frame.width,frame.height),
                                            cv.IPL_DEPTH_8U, frame.nChannels)
            if frame.origin == cv.IPL_ORIGIN_TL:
                cv.Copy(frame, frame_copy)
                cv.Flip(frame, frame_copy, 0)

            detect_and_draw(frame_copy, cascade)

            if cv.WaitKey(10) >= 0:
        image = cv.LoadImage(input_name, 1)
        detect_and_draw(image, cascade)


So as you can see, by using the bundled OpenCV Haar detection XML documents for frontal face detection, we are almost there already! Try it with:

python ./facedetect.py -c /usr/local/share/OpenCV/haarcascades/haarcascade_frontalface_alt.xml 0

Where 0 is the index of the camera you wish to use.

CORS in Python Bottle Framework

I was hacking up a quick mobile web app and needed to do an AJAX (jQuery based) POST and PUT to a simple Python Bottle REST server. Turn out that the AJAX POST was sending HTTP OPTIONS before the POST and the server was rejecting it with a 405. This, I find out, was due to a Cross Origin scripting issue and needed a solution.

Easiest way that I could come up with to fix the CORS (Cross Origin Resource Sharing) problem, was to write a simple bottle.py decorator that would enable me to do cross origin posts easily.

Firstly, the JQuery AJAX looked something like this on the client side:

                            url: "http://host:port/save",
                            type: "POST",
                            data: JSON.stringify( { "mydata" : "Some stuff" } ),
                            contentType: "application/json",
                            success: function(data){
                                $("#result").html('got datal: ' + data);
                                $("#result").html('There was an error while submitting: ' + error.message);

and the Python code looks like this:

# the decorator
def enable_cors(fn):
    def _enable_cors(*args, **kwargs):
        # set CORS headers
        response.headers['Access-Control-Allow-Origin'] = '*'
        response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
        response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'

        if bottle.request.method != 'OPTIONS':
            # actual request; reply with the actual response
            return fn(*args, **kwargs)

    return _enable_cors

You can then add the decorator to whatever of your @route methods that you need to enable CORS for with the simple @enable_cors annotation

@route('/save/', method=['OPTIONS', 'POST'])
def save():
    # Rest of your code here...

Note that in the method I have set a list of the allowed methods, which include HTTP OPTIONS…

That is it! Javascript folks will tell you to rather do a $.post query, but I do prefer this method (I am more of a server side type of dude…)

There are, other ways to achieve this, but in my opinion, this is simplest and most elegant.

How to write a dead simple REST API using Python Bottle

Reposted from old site – original date: Thursday 24 November 2011

REST API’s should be simple. Following from that statement, implementation of REST API’s should also be. Enter Bottle Python web framework http://bottlepy.org/docs/dev/

Bottle is a simple WSGI micro framework written in Python. I experimented with both Bottle and Web.py and found that although web.py is faster, Bottle is way easier to get going with on a simple app.

In order to quickly prototype a REST API with a MongoDB backend, you only need a few lines of code:

import json
import bottle
from bottle import route, run, request, abort
from pymongo import Connection

connection = Connection('localhost', 27017)
db = connection.mydatabase

@route('/documents', method='PUT')
def put_document():
	data = request.body.readline()
	if not data:
		abort(400, 'No data received')
	entity = json.loads(data)
	if not entity.has_key('_id'):
		abort(400, 'No _id specified')
	except ValidationError as ve:
		abort(400, str(ve))
@route('/documents/:id', method='GET')
def get_document(id):
	entity = db['documents'].find_one({'_id':id})
	if not entity:
		abort(404, 'No document with id %s' % id)
	return entity

run(host='localhost', port=8080)

Which will just about do it. Requirement is that you have the excellent pymongo package installed (get it at http://pypi.python.org/pypi/pymongo/ or just grab the Ubuntu package) and the Bottle packages (check docs for install instructions).

The code above is pretty self explanatory, but if you would like some additional notes, please do leave a comment and I will get back to you. Note the use of the route decorator, this is the bit that makes your life easy!

Happy RESTing… ;)

Laptop tracking system (cheap)

Reposted from old site – original date: Wednesday 7 September 2011

Following on from a discussion on a work forum, I decided to have a stab at writing a super simple laptop tracking system. The main aim of the system will be to track stolen laptops and (hopefully) recover them somehow.

After a little bit of consideration, I decided to have a look at doing this in a client/server way with a MongoDB back end. The following is a simple, but workable system that uses the python bindings to the DBUS messaging system on Linux (tested on Ubuntu 11.04), so I am doubtful that this could be used for anything other than that. That being said, however, the client bit simply needs to send through wifi access points and MAC addresses to get a triangulation on the position of the laptop, so I am pretty sure that this can be achieved with relative ease on other platforms.

Triangulation is done via a Google API and the coordinates as well as the accuracy level is then inserted to the MongoDB back end for mapping or whatever else needs to be done with the data. Features that the client should also probably support include bricking the machine remotely or something to that effect, as well as possibly sending messages or SOS tweets or something to let people know that it is not in its rightful owners posession.

Enough rambling! To the code!


As I said, the code uses python-dbus and the standard json and urllib modules. You can install python-dbus with an apt-get install python-dbus.

import dbus
import json
import urllib

NM = 'org.freedesktop.NetworkManager'
NMP = '/org/freedesktop/NetworkManager'
NMI = NM + '.Device'
PI = 'org.freedesktop.DBus.Properties'

def list_ssids(): 
    bus = dbus.SystemBus()
    nm = bus.get_object(NM,NMP)
    nmi = dbus.Interface(nm,NM)
    # Iterate over the devices queried via the interface
    for dev in nmi.GetDevices():
        # get each and bind a property interface to it
        devo = bus.get_object(NM,dev)
        devpi = dbus.Interface(devo,PI)
        if devpi.Get(NM+'.Device','DeviceType') == 2:
            wdevi = dbus.Interface(devo,NMI + '.Wireless')
            wifi = []
            for ap in wdevi.GetAccessPoints():
                apo = bus.get_object(NM,ap)
                api = dbus.Interface(apo,PI)
                wifi.append({'ssid':''.join(["%c" % b for b in api.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid", byte_arrays=True)]), 'mac':''.join(["%c" % b for b in api.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress", byte_arrays=True)])})
    return wifi

if __name__ == '__main__':
  ap = list_ssids()
  data = json.dumps(ap)
  params = urllib.urlencode({'wifi': data})
  f = urllib.urlopen("", params)

You will need to modify the post URL at the end to somewhere meaningful for yourself of course.

Initially, I did the client using the PHP ext/dbus, but decided against that due to the fact that it is hard to install and nobody really uses it…

The server side is just as simple. You will need a Mongodb instance running on the server and then you need a simple script to catch the POSTs. NOTE: This script is just a concept, so if you are actually going to do something like this, clean it up!

$wifi = $_POST['wifi'];
$wifi = json_decode($wifi);
$request = array( 'version' => '1.1.0', 'host' => 'myurl.com', 'wifi_towers' => $wifi );
$c = curl_init();
curl_setopt( $c, CURLOPT_URL, 'https://www.google.com/loc/json' );
curl_setopt( $c, CURLOPT_POST, 1 );
curl_setopt( $c, CURLOPT_POSTFIELDS, json_encode( $request ) );
curl_setopt( $c, CURLOPT_RETURNTRANSFER, true );
$result = json_decode( curl_exec( $c ) )->location;

$fields = array(
            'laptopid' => urlencode('16cptl-pscott'),

// connect
$m = new Mongo();
// select a database
$db = $m->laptoptrack;
$collection = $db->lappoints;

// add a record
$obj = array( "loc" => array("lon" => floatval($fields['lon']), "lat" => floatval($fields['lat'])), "lon" => floatval($fields['lon']), "lat" => floatval($fields['lat']), "accuracy" => intval($fields['accuracy']), "laptopid" => $fields['laptopid'], "wifi" => json_encode($wifi));
$collection->ensureIndex(array('loc' => "2d"));
$collection->insert($obj, array('safe'=>true)); 

So from the above code, you will see that we create a 2d geospatial index on the Mongodb instance as well. Not sure if this is useful, but it will probably help in speeding up queries like “Gimme all the laptops that Company X owns in area Y” or something – if that is something that you would like to add to your query interface of course…

Also, I am not 100% sure of the legality of storing SSID’s with a location, so check that one first too!

Dead simple, works well.

I would say that the client bit should be on a cron job or something that pings the service every hour or something.

Remember: Mongodb works best on 64bit OS. If you are using a 32bit arch, then you will only be able to store around 2GB data at a time. Depending on the need for historical records etc, keep that in mind…

Most importantly, HAVE FUN!

This looks pretty interesting!

Reposted from old site – original date: Sunday 6 June 2010

Description from announcement…

PiCloud, a cloud computing platform for the Python Programming Language, has released version 1.9 of its client library, cloud. PiCloud enables Python users to leverage the power of an on-demand, high performance, and auto scaling compute cluster with as few as three lines of code! No server management necessary. You can find out more here: http://www.picloud.com

What’s New:
* Store your data files on the cloud using our cloud.files interface!
* Greatly reduced the cloud library’s memory and cpu usage when sending large data.
* Map jobs begin processing before cloud.map() returns–large performance gains.
* Persistent user processes has in many cases reduced function overhead by over 50%.
* Increased network protocol reliability.
* Profiling is now disabled by default, but can be enabled with the _profile keyword.
* Bug fixes, and much more!

Full service description:
PiCloud is a cloud computing platform that integrates into the Python Programming Language. It enables you to leverage the compute power of Amazon Web Services without having to manage, maintain, or configure virtual servers.

PiCloud integrates seamlessly into your existing code base through a custom Python library, cloud. To offload the execution of a function to the cloud, all you must do is pass your desired function into the cloud library. PiCloud will then run the function on its high-performance and automatically-scaling cluster. We quickly scale our server capacity to meet your computational needs, and only charge you for the resources you actually consume. Getting on the cloud has never been this easy!

PiCloud improves the full cycle of software development and deployment. Functions that are run on PiCloud have their resource usage monitored, performance analyzed, and errors traced; we further aggregate all your functions to give you a bird’s eye view of your service. Through these introspective capabilities, PiCloud enables you to develop faster, easier, and smarter.

Common use cases for our platform:
* Scientific computing
* Video and image encoding
* Statistical analysis of data sets
* Real-time data processing
* Charts and graphs generation

Why Tweetdeck?

Reposted from old site – original date: Thursday 26 November 2009

I was thinking a bit about stuff last night, and especially the fact that I am under pressure from a lot of folks to move away from Tweetdeck (which I think is awesome) to a web based client like HootSuite,
Seesmic web or any of the other AJAXY things out there (there are now loads to choose from). The bottom line is, yes, they probably do work well, and yes, that is great, but
there are very distinct disadvantages to using a browser based client:

You need an open browser window, “maximised” to be useful.

IMO browser windows take up a LOT of desktop real estate, so not great when you have 6 shells open too, and, and, and…

I realise that Tweetdeck does take up quite a lot of resources, although it seems MUCH better in its latest incarnation, but I really like it.

Code wise, would you rather write a bunch of JavaScript and PHP or something to do a simple AJAX based Twitter client, or would you rather write a
hundred or so lines of python to do the same thing on your desktop? Remembering that in a web environment you have to hack thread-like behaviour, while on desktop
threads are a no-brainer to do background, non-blocking updates etc.

Basically, what I am saying is that RIA (Rich Internet Applications) rock. They bring the best of both worlds to the user in a controlled and resource happy environment. In a web sense, if all my users
were using desktop clients, I would be over the moon, as the only thing I would have to serve is JSON or something similarly lightweight, saving my bandwidth bill and my hardware bill, as I could support
way more folks than a heavier interface could!

The trick though is to save yourself time too. Write simple, cross platform desktop apps that are easily maintainable and package-able. I chose wxPython to demo with below. You don’t need whole whacks of code
you just need a minimal interface to hold together some glue code making use of existing packages. The code below is not pretty, but it is terse and to the point.

If you would like to improve the code, please feel free, but please try keep it less than 100 lines…

So can you write a simple, but useful Twitter client in less than 100 lines? Here is one in 98, including shebang…

#!/usr/bin/env python
import wx
import twitter
import time
import sys 

class RedirectText(object):
    def __init__(self,aWxTextCtrl):
    def write(self,string):

class MyFrame(wx.Frame):

    def __init__(self, *args, **kwds):
        wx.Frame.__init__(self, *args, **kwds)
        self.textUpdate = wx.TextCtrl(self, -1, "", style=wx.TE_PROCESS_ENTER|wx.TE_MULTILINE|wx.TE_AUTO_URL|wx.TE_LINEWRAP|wx.TE_WORDWRAP)
        self.updateButton = wx.Button(self, -1, "Update", style=wx.BU_EXACTFIT)
        self.panel = wx.Panel(self, wx.ID_ANY)
        self.log = wx.TextCtrl(self.panel, wx.ID_ANY, "",  style=wx.TE_MULTILINE|wx.TE_WORDWRAP)


        self.Bind(wx.EVT_TEXT_ENTER, self.onUpdate, self.textUpdate)
        self.Bind(wx.EVT_BUTTON, self.onUpdate, self.updateButton)

    def __set_properties(self):
        self.username = 'paulscott56'
        self.password = 'password123'
        self.encoding = None
        self.SetSize(wx.DLG_SZE(self, (205, 234)))

    def __do_layout(self):
        sizer_1 = wx.BoxSizer(wx.VERTICAL)
        sizer_2 = wx.BoxSizer(wx.VERTICAL)
        sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
        sizer_2.Add(self.textUpdate, 1, wx.ALL|wx.EXPAND, 4)
        sizer_2.Add(self.updateButton, 0, wx.ALL|wx.EXPAND, 4)
        sizer_3.Add(self.log, 4, wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL|wx.ALIGN_CENTER_VERTICAL, 4)
        sizer_2.Add(self.panel, 1, wx.EXPAND, 0)
        sizer_1.Add(sizer_2, 1, wx.EXPAND, 0)

    def onUpdate(self, event): 
        self.api = twitter.Api(username=self.username, password=self.password, input_encoding=self.encoding)
        message = self.textUpdate.GetValue()
            status = self.api.PostUpdate(message)
        except UnicodeDecodeError:
            print "Your message could not be encoded.  Perhaps it contains non-ASCII characters? "
        print "%s just posted: %s" % (status.user.name, status.text)

    def fetchTwitter(self,  event):
        self.api = twitter.Api(username=self.username, password=self.password, input_encoding=self.encoding)
        statuses = self.api.GetFriendsTimeline(user=self.username)
        for s in statuses:
            # make the text thing
            sn = s.user.screen_name
            txt = s.text
            time = s.relative_created_at
            stringthing = sn+": "+txt+" "+time+'rn'
            # redirect text
            print stringthing
        # make a timer here
        timeLapse = 60000
        self.timer = wx.Timer(self)
        self.Bind(wx.EVT_TIMER, self.fetchTwitter, self.timer) 

# end of class MyFrame

if __name__ == "__main__":
    app = wx.PySimpleApp(0)
    Twitrrr = MyFrame(None, -1, "")


Very dirty site monitor

Reposted from old site – original date: 2009-05-26 11:54:39

I needed a quick script to monitor a bunch of sites quickly and *very* simply. I looked at doing this all in a single thread with a ping, but then decided to go a non blocking route and make the thing a little more thread safe. This was done so that multiple sites that actually could go down could be monitored with relative accuracy, without a single site blocking the rest.

OK, enough flapping, here is the code (Python this time)

import threading
import os, popen2, select, signal
import Queue
import sys, xmpp
import time



jidparams={'jid':'[email protected]/pinger', 'password':'somepass'}
cl=xmpp.Client(jid.getDomain(), debug=[])
con=cl.connect() ##proxy={'host':'cache.company.co.za','port':'8080','user':'userdude','password':'le password'})
if not con:
    print 'could not connect!'


if not auth:
    print 'could not authenticate!'

def log(s):
    print s
    text = s
    tojid='[email protected]'

class Pinger(threading.Thread):
    def __init__(self, queue, address, *args, **kwargs):
        self.address = address
        self.queue = queue
        threading.Thread.__init__(self, *args, **kwargs)
        self.stop = 0

    def run(self):
        child = popen2.Popen3("ping -i %i %s 21" % (PING_INTERVAL, self.address))
        while 1:
            ready_fds = select.select([child.fromchild], [], [])
            line = child.fromchild.readline()
            if line.find("Destination Host Unreachable") = 0:
                log("host %s is down" % self.address)

            if self.stop:
                os.kill(child.pid, signal.SIGTERM)

def main():
    pinglist = ["www.company1.co.za", "it.company.co.za", "something.company.co.za", ""]
    threads = []
    queue = Queue.Queue()

    for adr in pinglist:
        threads.append(Pinger(queue, adr))

    for thread in threads:

        while 1:
    except KeyboardInterrupt:

    for thread in threads:
        thread.stop = 1

    for thread in threads:

    # Ok, I've had enough of you stoopid threads.

if __name__ == "__main__":

Basic pyinotify usage

Reposted from old site – original date: 2009-04-26 14:07:17

Basic pyinotify usage:

import os
from pyinotify import WatchManager, Notifier, ThreadedNotifier, EventsCodes, ProcessEvent
wm = WatchManager()
mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE  # watched events

class PTmp(ProcessEvent):
    def process_IN_CREATE(self, event):
        print "Create: %s" %  os.path.join(event.path, event.name)

    def process_IN_DELETE(self, event):
        print "Remove: %s" %  os.path.join(event.path, event.name)

    notifier = Notifier(wm, PTmp())
    wdd = wm.add_watch(\'/home/paul/Desktop\', mask, rec=True)

    while True:  # loop forever
        # process the queue of events as explained above
        if notifier.check_events():
            # read notified events and enqeue them
            # you can do some tasks here...
    except KeyboardInterrupt:
    # destroy the inotify's instance on this interrupt (stop monitoring)

Python tab completion

Reposted from old site – original date: 2009-04-17 09:01:28

Thanks to John ( http://www.jdtangney.com ) for pointing this out:

# python startup file
import readline
import rlcompleter
import atexit
import os
# tab completion
readline.parse_and_bind(\'tab: complete\')
# history file
histfile = os.path.join(os.environ[\'HOME\'], \'.pythonhistory\')
except IOError:
atexit.register(readline.write_history_file, histfile)
del os, histfile, readline, rlcompleter