Category Archives: Python

Using OpenCV for great customer service

OpenCV is an Open Source Computer Vision library that can be used in a variety of applications. There are a few wrappers for it that will expose the OpenCV API in a number of languages, but we will look at the Python wrapper in this post.

One application that I was thinking could be done very quickly and easily, would be to use facial recognition to look up a customer before servicing them. This can easily be achieved using a simple cheap webcam mounted at the entrance to a service centre that captures people’s faces as they enter the building. This can then be used to look up against a database of images to identify the customer and all their details immediately on the service centre agent’s terminal. If a customer is a new customer, the agent could then capture the info for next time.

Privacy issues aside, this should be relatively easy to implement.

#!/usr/bin/python
import sys
import cv2.cv as cv
from optparse import OptionParser

# Parameters for haar detection
# From the API:
# The default parameters (scale_factor=2, min_neighbors=3, flags=0) are tuned
# for accurate yet slow object detection. For a faster operation on real video
# images the settings are:
# scale_factor=1.2, min_neighbors=2, flags=CV_HAAR_DO_CANNY_PRUNING,
# min_size=<minimum possible face size

min_size = (20, 20)
image_scale = 2
haar_scale = 1.2
min_neighbors = 2
haar_flags = 0

def detect_and_draw(img, cascade):
    # allocate temporary images
    gray = cv.CreateImage((img.width,img.height), 8, 1)
    small_img = cv.CreateImage((cv.Round(img.width / image_scale),
                   cv.Round (img.height / image_scale)), 8, 1)

    # convert color input image to grayscale
    cv.CvtColor(img, gray, cv.CV_BGR2GRAY)

    # scale input image for faster processing
    cv.Resize(gray, small_img, cv.CV_INTER_LINEAR)

    cv.EqualizeHist(small_img, small_img)

    if(cascade):
        t = cv.GetTickCount()
        faces = cv.HaarDetectObjects(small_img, cascade, cv.CreateMemStorage(0),
                                     haar_scale, min_neighbors, haar_flags, min_size)
        t = cv.GetTickCount() - t
        print "detection time = %gms" % (t/(cv.GetTickFrequency()*1000.))
        if faces:
            for ((x, y, w, h), n) in faces:
                # the input to cv.HaarDetectObjects was resized, so scale the
                # bounding box of each face and convert it to two CvPoints
                pt1 = (int(x * image_scale), int(y * image_scale))
                pt2 = (int((x + w) * image_scale), int((y + h) * image_scale))
                cv.Rectangle(img, pt1, pt2, cv.RGB(255, 0, 0), 3, 8, 0)

    cv.ShowImage("result", img)

if __name__ == '__main__':

    parser = OptionParser(usage = "usage: %prog [options] [filename|camera_index]")
    parser.add_option("-c", "--cascade", action="store", dest="cascade", type="str", help="Haar cascade file, default %default", default = "../data/haarcascades/haarcascade_frontalface_alt.xml")
    (options, args) = parser.parse_args()

    cascade = cv.Load(options.cascade)

    if len(args) != 1:
        parser.print_help()
        sys.exit(1)

    input_name = args[0]
    if input_name.isdigit():
        capture = cv.CreateCameraCapture(int(input_name))
    else:
        capture = None

    cv.NamedWindow("result", 1)

    if capture:
        frame_copy = None
        while True:
            frame = cv.QueryFrame(capture)
            if not frame:
                cv.WaitKey(0)
                break
            if not frame_copy:
                frame_copy = cv.CreateImage((frame.width,frame.height),
                                            cv.IPL_DEPTH_8U, frame.nChannels)
            if frame.origin == cv.IPL_ORIGIN_TL:
                cv.Copy(frame, frame_copy)
            else:
                cv.Flip(frame, frame_copy, 0)

            detect_and_draw(frame_copy, cascade)

            if cv.WaitKey(10) >= 0:
                break
    else:
        image = cv.LoadImage(input_name, 1)
        detect_and_draw(image, cascade)
        cv.WaitKey(0)

    cv.DestroyWindow("result")

So as you can see, by using the bundled OpenCV Haar detection XML documents for frontal face detection, we are almost there already! Try it with:

python ./facedetect.py -c /usr/local/share/OpenCV/haarcascades/haarcascade_frontalface_alt.xml 0

Where 0 is the index of the camera you wish to use.

An introduction to Apache Spark

What is Apache Spark?

Apache Spark is a fast and general engine for large-scale data processing.

Related documents

http://spark.apache.org

You can find the latest Spark documentation, including a programming
guide, on the project webpage at http://spark.apache.org/documentation.html.

Setup

Spark needs to be downloaded and installed on your local machine
Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which can be obtained (http://www.scala-sbt.org). If SBT is installed we
will use the system version of sbt otherwise we will attempt to download it
automatically. To build Spark and its example programs, run:

./sbt/sbt assembly

Once you’ve built Spark, the easiest way to start using it is the shell:

./bin/spark-shell

Or, for the Python API, the Python shell (`./bin/pyspark`).

Spark also comes with several sample programs in the `examples` directory.
To run one of them, use `./bin/run-example <class> <params>`. For example:

./bin/run-example org.apache.spark.examples.SparkLR local[2]

will run the Logistic Regression example locally on 2 CPUs.

Each of the example programs prints usage help if no params are given.

All of the Spark samples take a `<master>` parameter that is the cluster URL
to connect to. This can be a mesos:// or spark:// URL, or “local” to run
locally with one thread, or “local[N]” to run locally with N threads.

Tests

Testing first requires building Spark. Once Spark is built, tests
can be run using:

`./sbt/sbt test`

Hadoop versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported
storage systems. Because the protocols have changed in different versions of
Hadoop, you must build Spark against the same version that your cluster runs.
You can change the version by setting the `SPARK_HADOOP_VERSION` environment
when building Spark.

For Apache Hadoop versions 1.x, Cloudera CDH MRv1, and other Hadoop
versions without YARN, use:

# Apache Hadoop 1.2.1
$ SPARK_HADOOP_VERSION=1.2.1 sbt/sbt assembly

# Cloudera CDH 4.2.0 with MapReduce v1
$ SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 sbt/sbt assembly

For Apache Hadoop 2.2.X, 2.1.X, 2.0.X, 0.23.x, Cloudera CDH MRv2, and other Hadoop versions
with YARN, also set `SPARK_YARN=true`:

# Apache Hadoop 2.0.5-alpha
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

# Cloudera CDH 4.2.0 with MapReduce v2
$ SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 SPARK_YARN=true sbt/sbt assembly

# Apache Hadoop 2.2.X and newer
$ SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true sbt/sbt assembly

When developing a Spark application, specify the Hadoop version by adding the
“hadoop-client” artifact to your project’s dependencies. For example, if you’re
using Hadoop 1.2.1 and build your application using SBT, add this entry to
`libraryDependencies`:

“org.apache.hadoop” % “hadoop-client” % “1.2.1″

If your project is built with Maven, add this to your POM file’s `<dependencies>` section:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>1.2.1</version>
</dependency>

Spark could be very well suited for more in depth data mining from social streams like Twitter/Facebook
Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.
Write applications quickly in Java, Scala or Python.
Spark offers over 80 high-level operators that make it easy to build parallel apps. And you can use it interactively from the Scala and Python shells.
Combine SQL, streaming, and complex analytics.
Spark powers a stack of high-level tools including Shark for SQL, MLlib for machine learning, GraphX, and Spark Streaming. You can combine these frameworks seamlessly in the same application.
Spark can run on Hadoop 2′s YARN cluster manager, and can read any existing Hadoop data.
If you have a Hadoop 2 cluster, you can run Spark without any installation needed. Otherwise, Spark is easy to run standalone or on EC2 or Mesos. It can read from HDFS, HBase, Cassandra, and any Hadoop data source.

Examples

Once Spark is built, open an interactive Scala shell with

bin/spark-shell

You can then start working with the engine

We will do a quick analysis of an apache2 log file (access.log)

// Load the file up for analysis
val textFile = sc.textFile("/var/log/apache2/access.log")
// Count the number of lines in the file
textFile.count()
// Display the first line of the file
textFile.first()
// Display the Number of lines containing PHP
val linesWithPHP = textFile.filter(line => line.contains("PHP"))
// Count the lines with PHP
val linesWithPHP = textFile.filter(line => line.contains("PHP")).count()
// Do the classic MapReduce WordCount example
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts.collect()

Apps should be run as either Maven packaged Java apps, or Scala apps. Please refer to documentation for a HOWTO

Conclusion

Overall a good product, but some Hadoop expertise is required for successful set up and working
Mid level or senior level developer required
Some Scala language expertise is advantageous

CORS in Python Bottle Framework

I was hacking up a quick mobile web app and needed to do an AJAX (jQuery based) POST and PUT to a simple Python Bottle REST server. Turn out that the AJAX POST was sending HTTP OPTIONS before the POST and the server was rejecting it with a 405. This, I find out, was due to a Cross Origin scripting issue and needed a solution.

Easiest way that I could come up with to fix the CORS (Cross Origin Resource Sharing) problem, was to write a simple bottle.py decorator that would enable me to do cross origin posts easily.

Firstly, the JQuery AJAX looked something like this on the client side:

$.ajax({
                            url: "http://host:port/save",
                            type: "POST",
                            data: JSON.stringify( { "mydata" : "Some stuff" } ),
                            contentType: "application/json",
                            success: function(data){
                                $("#result").html('got datal: ' + data);
                            },
                            error:function(error){
                                $("#result").html('There was an error while submitting: ' + error.message);
                            }
                    });

and the Python code looks like this:

# the decorator
def enable_cors(fn):
    def _enable_cors(*args, **kwargs):
        # set CORS headers
        response.headers['Access-Control-Allow-Origin'] = '*'
        response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, OPTIONS'
        response.headers['Access-Control-Allow-Headers'] = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token'

        if bottle.request.method != 'OPTIONS':
            # actual request; reply with the actual response
            return fn(*args, **kwargs)

    return _enable_cors

You can then add the decorator to whatever of your @route methods that you need to enable CORS for with the simple @enable_cors annotation

@route('/save/', method=['OPTIONS', 'POST'])
@enable_cors
def save():
    # Rest of your code here...

Note that in the method I have set a list of the allowed methods, which include HTTP OPTIONS…

That is it! Javascript folks will tell you to rather do a $.post query, but I do prefer this method (I am more of a server side type of dude…)

There are, other ways to achieve this, but in my opinion, this is simplest and most elegant.

How to write a dead simple REST API using Python Bottle

Reposted from old site – original date: Thursday 24 November 2011

REST API’s should be simple. Following from that statement, implementation of REST API’s should also be. Enter Bottle Python web framework http://bottlepy.org/docs/dev/

Bottle is a simple WSGI micro framework written in Python. I experimented with both Bottle and Web.py and found that although web.py is faster, Bottle is way easier to get going with on a simple app.

In order to quickly prototype a REST API with a MongoDB backend, you only need a few lines of code:

import json
import bottle
from bottle import route, run, request, abort
from pymongo import Connection

connection = Connection('localhost', 27017)
db = connection.mydatabase

@route('/documents', method='PUT')
def put_document():
	data = request.body.readline()
	if not data:
		abort(400, 'No data received')
	entity = json.loads(data)
	if not entity.has_key('_id'):
		abort(400, 'No _id specified')
	try:
		db['documents'].save(entity)
	except ValidationError as ve:
		abort(400, str(ve))
	
@route('/documents/:id', method='GET')
def get_document(id):
	entity = db['documents'].find_one({'_id':id})
	if not entity:
		abort(404, 'No document with id %s' % id)
	return entity

run(host='localhost', port=8080)

Which will just about do it. Requirement is that you have the excellent pymongo package installed (get it at http://pypi.python.org/pypi/pymongo/ or just grab the Ubuntu package) and the Bottle packages (check docs for install instructions).

The code above is pretty self explanatory, but if you would like some additional notes, please do leave a comment and I will get back to you. Note the use of the route decorator, this is the bit that makes your life easy!

Happy RESTing… ;)

Laptop tracking system (cheap)

Reposted from old site – original date: Wednesday 7 September 2011

Following on from a discussion on a work forum, I decided to have a stab at writing a super simple laptop tracking system. The main aim of the system will be to track stolen laptops and (hopefully) recover them somehow.

After a little bit of consideration, I decided to have a look at doing this in a client/server way with a MongoDB back end. The following is a simple, but workable system that uses the python bindings to the DBUS messaging system on Linux (tested on Ubuntu 11.04), so I am doubtful that this could be used for anything other than that. That being said, however, the client bit simply needs to send through wifi access points and MAC addresses to get a triangulation on the position of the laptop, so I am pretty sure that this can be achieved with relative ease on other platforms.

Triangulation is done via a Google API and the coordinates as well as the accuracy level is then inserted to the MongoDB back end for mapping or whatever else needs to be done with the data. Features that the client should also probably support include bricking the machine remotely or something to that effect, as well as possibly sending messages or SOS tweets or something to let people know that it is not in its rightful owners posession.

Enough rambling! To the code!

Client:

As I said, the code uses python-dbus and the standard json and urllib modules. You can install python-dbus with an apt-get install python-dbus.

import dbus
import json
import urllib

NM = 'org.freedesktop.NetworkManager'
NMP = '/org/freedesktop/NetworkManager'
NMI = NM + '.Device'
PI = 'org.freedesktop.DBus.Properties'

def list_ssids(): 
    bus = dbus.SystemBus()
    nm = bus.get_object(NM,NMP)
    nmi = dbus.Interface(nm,NM)
    # Iterate over the devices queried via the interface
    for dev in nmi.GetDevices():
        # get each and bind a property interface to it
        devo = bus.get_object(NM,dev)
        devpi = dbus.Interface(devo,PI)
        if devpi.Get(NM+'.Device','DeviceType') == 2:
            wdevi = dbus.Interface(devo,NMI + '.Wireless')
            wifi = []
            for ap in wdevi.GetAccessPoints():
                apo = bus.get_object(NM,ap)
                api = dbus.Interface(apo,PI)
                wifi.append({'ssid':''.join(["%c" % b for b in api.Get("org.freedesktop.NetworkManager.AccessPoint", "Ssid", byte_arrays=True)]), 'mac':''.join(["%c" % b for b in api.Get("org.freedesktop.NetworkManager.AccessPoint", "HwAddress", byte_arrays=True)])})
                
    return wifi

if __name__ == '__main__':
  ap = list_ssids()
  data = json.dumps(ap)
  params = urllib.urlencode({'wifi': data})
  f = urllib.urlopen("http://127.0.0.1/junk/geo/geopost2.php", params)

You will need to modify the post URL at the end to somewhere meaningful for yourself of course.

Initially, I did the client using the PHP ext/dbus, but decided against that due to the fact that it is hard to install and nobody really uses it…

The server side is just as simple. You will need a Mongodb instance running on the server and then you need a simple script to catch the POSTs. NOTE: This script is just a concept, so if you are actually going to do something like this, clean it up!

<?php
$wifi = $_POST['wifi'];
$wifi = json_decode($wifi);
$request = array( 'version' => '1.1.0', 'host' => 'myurl.com', 'wifi_towers' => $wifi );
$c = curl_init();
curl_setopt( $c, CURLOPT_URL, 'https://www.google.com/loc/json' );
curl_setopt( $c, CURLOPT_POST, 1 );
curl_setopt( $c, CURLOPT_POSTFIELDS, json_encode( $request ) );
curl_setopt( $c, CURLOPT_RETURNTRANSFER, true );
$result = json_decode( curl_exec( $c ) )->location;

$fields = array(
            'lat'=>urlencode($result->latitude),
            'lon'=>urlencode($result->longitude),
            'accuracy'=>urlencode($result->accuracy),
            'laptopid' => urlencode('16cptl-pscott'),
            'wifi'=>urlencode(json_encode($wifi)),
        );


// connect
$m = new Mongo();
// select a database
$db = $m->laptoptrack;
$collection = $db->lappoints;

// add a record
$obj = array( "loc" => array("lon" => floatval($fields['lon']), "lat" => floatval($fields['lat'])), "lon" => floatval($fields['lon']), "lat" => floatval($fields['lat']), "accuracy" => intval($fields['accuracy']), "laptopid" => $fields['laptopid'], "wifi" => json_encode($wifi));
$collection->ensureIndex(array('loc' => "2d"));
$collection->insert($obj, array('safe'=>true)); 

So from the above code, you will see that we create a 2d geospatial index on the Mongodb instance as well. Not sure if this is useful, but it will probably help in speeding up queries like “Gimme all the laptops that Company X owns in area Y” or something – if that is something that you would like to add to your query interface of course…

Also, I am not 100% sure of the legality of storing SSID’s with a location, so check that one first too!

Dead simple, works well.

I would say that the client bit should be on a cron job or something that pings the service every hour or something.

Remember: Mongodb works best on 64bit OS. If you are using a 32bit arch, then you will only be able to store around 2GB data at a time. Depending on the need for historical records etc, keep that in mind…

Most importantly, HAVE FUN!

This looks pretty interesting!

Reposted from old site – original date: Sunday 6 June 2010

Description from announcement…

PiCloud, a cloud computing platform for the Python Programming Language, has released version 1.9 of its client library, cloud. PiCloud enables Python users to leverage the power of an on-demand, high performance, and auto scaling compute cluster with as few as three lines of code! No server management necessary. You can find out more here: http://www.picloud.com

What’s New:
* Store your data files on the cloud using our cloud.files interface!
* Greatly reduced the cloud library’s memory and cpu usage when sending large data.
* Map jobs begin processing before cloud.map() returns–large performance gains.
* Persistent user processes has in many cases reduced function overhead by over 50%.
* Increased network protocol reliability.
* Profiling is now disabled by default, but can be enabled with the _profile keyword.
* Bug fixes, and much more!

Full service description:
PiCloud is a cloud computing platform that integrates into the Python Programming Language. It enables you to leverage the compute power of Amazon Web Services without having to manage, maintain, or configure virtual servers.

PiCloud integrates seamlessly into your existing code base through a custom Python library, cloud. To offload the execution of a function to the cloud, all you must do is pass your desired function into the cloud library. PiCloud will then run the function on its high-performance and automatically-scaling cluster. We quickly scale our server capacity to meet your computational needs, and only charge you for the resources you actually consume. Getting on the cloud has never been this easy!

PiCloud improves the full cycle of software development and deployment. Functions that are run on PiCloud have their resource usage monitored, performance analyzed, and errors traced; we further aggregate all your functions to give you a bird’s eye view of your service. Through these introspective capabilities, PiCloud enables you to develop faster, easier, and smarter.

Common use cases for our platform:
* Scientific computing
* Video and image encoding
* Statistical analysis of data sets
* Real-time data processing
* Charts and graphs generation

Why Tweetdeck?

Reposted from old site – original date: Thursday 26 November 2009

I was thinking a bit about stuff last night, and especially the fact that I am under pressure from a lot of folks to move away from Tweetdeck (which I think is awesome) to a web based client like HootSuite,
Seesmic web or any of the other AJAXY things out there (there are now loads to choose from). The bottom line is, yes, they probably do work well, and yes, that is great, but
there are very distinct disadvantages to using a browser based client:

You need an open browser window, “maximised” to be useful.

IMO browser windows take up a LOT of desktop real estate, so not great when you have 6 shells open too, and, and, and…

I realise that Tweetdeck does take up quite a lot of resources, although it seems MUCH better in its latest incarnation, but I really like it.

Code wise, would you rather write a bunch of JavaScript and PHP or something to do a simple AJAX based Twitter client, or would you rather write a
hundred or so lines of python to do the same thing on your desktop? Remembering that in a web environment you have to hack thread-like behaviour, while on desktop
threads are a no-brainer to do background, non-blocking updates etc.

Basically, what I am saying is that RIA (Rich Internet Applications) rock. They bring the best of both worlds to the user in a controlled and resource happy environment. In a web sense, if all my users
were using desktop clients, I would be over the moon, as the only thing I would have to serve is JSON or something similarly lightweight, saving my bandwidth bill and my hardware bill, as I could support
way more folks than a heavier interface could!

The trick though is to save yourself time too. Write simple, cross platform desktop apps that are easily maintainable and package-able. I chose wxPython to demo with below. You don’t need whole whacks of code
you just need a minimal interface to hold together some glue code making use of existing packages. The code below is not pretty, but it is terse and to the point.

If you would like to improve the code, please feel free, but please try keep it less than 100 lines…

So can you write a simple, but useful Twitter client in less than 100 lines? Here is one in 98, including shebang…

#!/usr/bin/env python
import wx
import twitter
import time
import sys 

class RedirectText(object):
    def __init__(self,aWxTextCtrl):
        self.out=aWxTextCtrl
 
    def write(self,string):
        self.out.WriteText(string) 

class MyFrame(wx.Frame):

    def __init__(self, *args, **kwds):
        kwds["style"] = wx.ICONIZE|wx.CAPTION|wx.MINIMIZE|wx.CLOSE_BOX|wx.MINIMIZE_BOX|wx.MAXIMIZE_BOX|wx.SYSTEM_MENU|wx.RESIZE_BORDER|wx.CLIP_CHILDREN
        wx.Frame.__init__(self, *args, **kwds)
        self.textUpdate = wx.TextCtrl(self, -1, "", style=wx.TE_PROCESS_ENTER|wx.TE_MULTILINE|wx.TE_AUTO_URL|wx.TE_LINEWRAP|wx.TE_WORDWRAP)
        self.updateButton = wx.Button(self, -1, "Update", style=wx.BU_EXACTFIT)
        self.panel = wx.Panel(self, wx.ID_ANY)
        self.log = wx.TextCtrl(self.panel, wx.ID_ANY, "",  style=wx.TE_MULTILINE|wx.TE_WORDWRAP)
        #self.log.Disable() 

        self.__set_properties()
        self.__do_layout()

        self.Bind(wx.EVT_TEXT_ENTER, self.onUpdate, self.textUpdate)
        self.Bind(wx.EVT_BUTTON, self.onUpdate, self.updateButton)

    def __set_properties(self):
        self.username = 'paulscott56'
        self.password = 'password123'
        self.encoding = None
        
        self.SetTitle("Twitrrr")
        self.SetSize(wx.DLG_SZE(self, (205, 234)))
        self.textUpdate.SetFocus()
        self.fetchTwitter(self)
        

    def __do_layout(self):
        sizer_1 = wx.BoxSizer(wx.VERTICAL)
        sizer_2 = wx.BoxSizer(wx.VERTICAL)
        sizer_3 = wx.BoxSizer(wx.HORIZONTAL)
        sizer_2.Add(self.textUpdate, 1, wx.ALL|wx.EXPAND, 4)
        sizer_2.Add(self.updateButton, 0, wx.ALL|wx.EXPAND, 4)
        sizer_3.Add(self.log, 4, wx.EXPAND|wx.ALIGN_CENTER_HORIZONTAL|wx.ALIGN_CENTER_VERTICAL, 4)
        self.panel.SetSizer(sizer_3)
        sizer_2.Add(self.panel, 1, wx.EXPAND, 0)
        sizer_1.Add(sizer_2, 1, wx.EXPAND, 0)
        self.SetSizer(sizer_1)
        self.Layout()
        self.Centre()

    def onUpdate(self, event): 
        self.api = twitter.Api(username=self.username, password=self.password, input_encoding=self.encoding)
        message = self.textUpdate.GetValue()
        try:
            status = self.api.PostUpdate(message)
            self.textUpdate.Clear()
        except UnicodeDecodeError:
            print "Your message could not be encoded.  Perhaps it contains non-ASCII characters? "
            self.textUpdate.Clear()
        print "%s just posted: %s" % (status.user.name, status.text)
        event.Skip()

    def fetchTwitter(self,  event):
        self.api = twitter.Api(username=self.username, password=self.password, input_encoding=self.encoding)
        statuses = self.api.GetFriendsTimeline(user=self.username)
        self.log.Clear()
        statuses.reverse()
        for s in statuses:
            # make the text thing
            sn = s.user.screen_name
            txt = s.text
            time = s.relative_created_at
            stringthing = sn+": "+txt+" "+time+'rn'
            # redirect text
            redir=RedirectText(self.log)
            sys.stdout=redir 
            
            print stringthing
        # make a timer here
        timeLapse = 60000
        self.timer = wx.Timer(self)
        self.timer.Start(timeLapse)
        self.Bind(wx.EVT_TIMER, self.fetchTwitter, self.timer) 

# end of class MyFrame

if __name__ == "__main__":
    app = wx.PySimpleApp(0)
    wx.InitAllImageHandlers()
    Twitrrr = MyFrame(None, -1, "")
    app.SetTopWindow(Twitrrr)
    Twitrrr.Show()
    app.MainLoop()

python_logo_without_textsvg

Very dirty site monitor

Reposted from old site – original date: 2009-05-26 11:54:39

I needed a quick script to monitor a bunch of sites quickly and *very* simply. I looked at doing this all in a single thread with a ping, but then decided to go a non blocking route and make the thing a little more thread safe. This was done so that multiple sites that actually could go down could be monitored with relative accuracy, without a single site blocking the rest.

OK, enough flapping, here is the code (Python this time)

import threading
import os, popen2, select, signal
import Queue
import sys, xmpp
import time

HOST_UP = 1
HOST_DOWN = 0

PING_INTERVAL = 60

jidparams={'jid':'[email protected]/pinger', 'password':'somepass'}
jid=xmpp.protocol.JID(jidparams['jid'])
cl=xmpp.Client(jid.getDomain(), debug=[])
con=cl.connect() ##proxy={'host':'cache.company.co.za','port':'8080','user':'userdude','password':'le password'})
if not con:
    print 'could not connect!'
    sys.exit()

auth=cl.auth(jid.getNode(),jidparams['password'],resource=jid.getResource())

if not auth:
    print 'could not authenticate!'
    sys.exit()

def log(s):
    print s
    text = s
    tojid='[email protected]'
    #cl.SendInitPresence(requestRoster=0)
    id=cl.send(xmpp.protocol.Message(tojid,text))
    time.sleep(1)

class Pinger(threading.Thread):
    def __init__(self, queue, address, *args, **kwargs):
        self.address = address
        self.queue = queue
        threading.Thread.__init__(self, *args, **kwargs)
        self.stop = 0

    def run(self):
        child = popen2.Popen3("ping -i %i %s 21" % (PING_INTERVAL, self.address))
        while 1:
            ready_fds = select.select([child.fromchild], [], [])
            line = child.fromchild.readline()
            if line.find("Destination Host Unreachable") = 0:
                log("host %s is down" % self.address)
            else:
                pass

            if self.stop:
                os.kill(child.pid, signal.SIGTERM)
                break

def main():
    pinglist = ["www.company1.co.za", "it.company.co.za", "something.company.co.za", "172.16.65.208"]
    threads = []
    queue = Queue.Queue()

    for adr in pinglist:
        threads.append(Pinger(queue, adr))

    for thread in threads:
        thread.start()

    try:
        while 1:
            time.sleep(1)
    except KeyboardInterrupt:
        pass

    for thread in threads:
        thread.stop = 1

    for thread in threads:
        thread.join(2.0)

    # Ok, I've had enough of you stoopid threads.
    os._exit(0)

if __name__ == "__main__":
    main()
python_logo_without_textsvg

Basic pyinotify usage

Reposted from old site – original date: 2009-04-26 14:07:17

Basic pyinotify usage:

import os
from pyinotify import WatchManager, Notifier, ThreadedNotifier, EventsCodes, ProcessEvent
wm = WatchManager()
mask = EventsCodes.IN_DELETE | EventsCodes.IN_CREATE  # watched events

class PTmp(ProcessEvent):
    def process_IN_CREATE(self, event):
        print "Create: %s" %  os.path.join(event.path, event.name)

    def process_IN_DELETE(self, event):
        print "Remove: %s" %  os.path.join(event.path, event.name)

    notifier = Notifier(wm, PTmp())
    wdd = wm.add_watch(\'/home/paul/Desktop\', mask, rec=True)

    while True:  # loop forever
    try:
        # process the queue of events as explained above
        notifier.process_events()
        if notifier.check_events():
            # read notified events and enqeue them
            notifier.read_events()
            # you can do some tasks here...
    except KeyboardInterrupt:
    # destroy the inotify's instance on this interrupt (stop monitoring)
    notifier.stop()
break
python_logo_without_textsvg

Twitter style times in Python

Reposted from old site – original date: 2009-04-25 13:03:23

Some python code to get cool “posted x minutes/hours/days ago” a la Twitter:

    # We need something to fudge the times for the "about" bit
    fudge = 1.25
    # Now for the actual time delta
    delta  = int(self.now) - int(self.created_at_in_seconds)

    if delta  (1 * fudge):
      return 'about a second ago'
    elif delta  (60 * (1/fudge)):
      return 'about %d seconds ago' % (delta)
    elif delta  (60 * fudge):
      return 'about a minute ago'
    elif delta  (60 * 60 * (1/fudge)):
      return 'about %d minutes ago' % (delta / 60)
    elif delta  (60 * 60 * fudge):
      return 'about an hour ago'
    elif delta  (60 * 60 * 24 * (1/fudge)):
      return 'about %d hours ago' % (delta / (60 * 60))
    elif delta  (60 * 60 * 24 * fudge):
      return 'about a day ago'
    else:
      return 'about %d days ago' % (delta / (60 * 60 * 24))

Thats it! Simple eh?