Saturday, February 18, 2012

Websockets Revisited - Part 2

This is part 2 of a 3 part post covering my recent experiences with websockets. In this section I will just post the source code and show you how to test it out. In the next section (if your interested) I will go over the changes I made to websocket.py and client.html to get it working with both Safari and Chrome on my MacBook Pro.

Copy and paste the following 3 files into the same directory on your existing websever. All you need to change is one line in the client.html file. Line 41 should look like this ...

ws = new WebSocket("ws://174.143.214.70:9876/");

Just change the IP address to point to your webserver IP address. Once that is done you run ...

python example.py

Then you point your browser to client.html and type something in the message input field and press the button next to it and if you see a response in the browser then it is working. I did not update the demo to show a timer tic or to broadcast the message to all connected clients. That is left as an exercise for the reader :-)

So here is client.html ...


<!DOCTYPE html>
<html lang="en">
<head>
<title>Test</title>
<script type="application/javascript">
var ws;

var file, fr, new_str;


function receivedText() {
result = fr.result;
new_str = "";
for (n = 0; n < result.length; ++n) {
aByte = result.charCodeAt(n);
new_str += String.fromCharCode(aByte);
}
// alert(new_str);
handleCommand();
}

function handleCommand(){
//da = e.data.split(":");
da = new_str.split(":");


cmd = da[0];
data = da[1];
// servermsg.innerHTML = servermsg.innerHTML + "<br>Recieved data: " + e.data;
if (cmd == "tick"){
tic.innerHTML = data;
}else{
servermsg.innerHTML = servermsg.innerHTML + "<br>Rcvd: " + data;
}
}


function init() {
var servermsg = document.getElementById("servermsg");

ws = new WebSocket("ws://174.143.214.70:9876/");
ws.onopen = function(){
servermsg.innerHTML = servermsg.innerHTML + "<br>Server connected";
};
ws.onmessage = function(e){

x = e.data;
retcode = (x instanceof Blob);

if (retcode){
// this works for chrome who returns a blob object from the websocket
new_str = "";
file = e.data;
fr = new FileReader();
fr.onload = receivedText;
fr.readAsText(file);
}else{
new_str = e.data;
handleCommand();
}

};
ws.onclose = function(){
console.log("Server disconnected");
servermsg.innerHTML = servermsg.innerHTML + "<br>Disconnected";
};
}
function postmsg(){
var text = document.getElementById("message").value;
text = "msg:" + text;
ws.send(text);
servermsg.innerHTML = servermsg.innerHTML + "<br>Sent: " + text;
return false;
}
</script>
</head>
<body onload="init();">
<span id="tic">0</span><br/>
<input type="text" name="message" value="" id="message">
<button onClick="postmsg();">Send</button>
<div id="servermsg" style="overflow:auto; height:400px; width:400px; border:1px solid black"><h1>Message log:</h1></div>
</body>
</html>




Yes, I am a terrible hack; I know :-(

Next, we have the example.py file ...


import websocket
import time


#bind = ('127.0.0.1', 8888)
class MyWrap(websocket.WebSocketServer):
pass

def new_client(self):
while 1:
bufs_list, closed_string = self.recv_frames()
print "Received --->", bufs_list
ret_code = self.send_frames(bufs=bufs_list)
print "ret code from send is ", ret_code

def poll(self):
#print "Inside Poll"
pass

if __name__ == "__main__":
server = MyWrap(verbose=True, listen_host='', listen_port=9876)
server.start_server()
while 1:
time.sleep(1)





And finally websocket.py ...


#!/usr/bin/env python

'''
Python WebSocket library with support for "wss://" encryption.
Copyright 2011 Joel Martin
Licensed under LGPL version 3 (see docs/LICENSE.LGPL-3)

Supports following protocol versions:
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-75
- http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
- http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-10

You can make a cert/key with openssl using:
openssl req -new -x509 -days 365 -nodes -out self.pem -keyout self.pem
as taken from http://docs.python.org/dev/library/ssl.html#certificates

'''

import os, sys, time, errno, signal, socket, traceback, select
import array, struct
from cgi import parse_qsl
from base64 import b64encode, b64decode

# Imports that vary by python version

# python 3.0 differences
if sys.hexversion > 0x3000000:
b2s = lambda buf: buf.decode('latin_1')
s2b = lambda s: s.encode('latin_1')
s2a = lambda s: s
else:
b2s = lambda buf: buf # No-op
s2b = lambda s: s # No-op
s2a = lambda s: [ord(c) for c in s]
try: from io import StringIO
except: from cStringIO import StringIO
try: from http.server import SimpleHTTPRequestHandler
except: from SimpleHTTPServer import SimpleHTTPRequestHandler
try: from urllib.parse import urlsplit
except: from urlparse import urlsplit

# python 2.6 differences
try: from hashlib import md5, sha1
except: from md5 import md5; from sha import sha as sha1

# python 2.5 differences
try:
from struct import pack, unpack_from
except:
from struct import pack
def unpack_from(fmt, buf, offset=0):
slice = buffer(buf, offset, struct.calcsize(fmt))
return struct.unpack(fmt, slice)

# Degraded functionality if these imports are missing
for mod, sup in [('numpy', 'HyBi protocol'), ('ssl', 'TLS/SSL/wss'),
('multiprocessing', 'Multi-Processing'),
('resource', 'daemonizing')]:
try:
globals()[mod] = __import__(mod)
except ImportError:
globals()[mod] = None
print("WARNING: no '%s' module, %s is slower or disabled" % (
mod, sup))
if multiprocessing and sys.platform == 'win32':
# make sockets pickle-able/inheritable
import multiprocessing.reduction


class WebSocketServer(object):
"""
WebSockets server class.
Must be sub-classed with new_client method definition.
"""

buffer_size = 65536

server_handshake_hixie = """HTTP/1.1 101 Web Socket Protocol Handshake\r
Upgrade: WebSocket\r
Connection: Upgrade\r
%sWebSocket-Origin: %s\r
%sWebSocket-Location: %s://%s%s\r
"""

server_handshake_hybi = """HTTP/1.1 101 Switching Protocols\r
Upgrade: websocket\r
Connection: Upgrade\r
Sec-WebSocket-Accept: %s\r
"""

GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

policy_response = """<cross-domain-policy><allow-access-from domain="*" to-ports="*" /></cross-domain-policy>\n"""

class EClose(Exception):
pass

def __init__(self, listen_host='', listen_port=None, source_is_ipv6=False,
verbose=False, cert='', key='', ssl_only=None,
daemon=False, record='', web='',
run_once=False, timeout=0):

# settings
self.verbose = verbose
self.listen_host = listen_host
self.listen_port = listen_port
self.ssl_only = ssl_only
self.daemon = daemon
self.run_once = run_once
self.timeout = timeout

self.launch_time = time.time()
self.ws_connection = False
self.handler_id = 1

# Make paths settings absolute
self.cert = os.path.abspath(cert)
self.key = self.web = self.record = ''
if key:
self.key = os.path.abspath(key)
if web:
self.web = os.path.abspath(web)
if record:
self.record = os.path.abspath(record)

if self.web:
os.chdir(self.web)

# Sanity checks
if not ssl and self.ssl_only:
raise Exception("No 'ssl' module and SSL-only specified")
if self.daemon and not resource:
raise Exception("Module 'resource' required to daemonize")

# Show configuration
print("WebSocket server settings:")
print(" - Listen on %s:%s" % (
self.listen_host, self.listen_port))
print(" - Flash security policy server")
if self.web:
print(" - Web server. Web root: %s" % self.web)
if ssl:
if os.path.exists(self.cert):
print(" - SSL/TLS support")
if self.ssl_only:
print(" - Deny non-SSL/TLS connections")
else:
print(" - No SSL/TLS support (no cert file)")
else:
print(" - No SSL/TLS support (no 'ssl' module)")
if self.daemon:
print(" - Backgrounding (daemon)")
if self.record:
print(" - Recording to '%s.*'" % self.record)

#
# WebSocketServer static methods
#

@staticmethod
def socket(host, port=None, connect=False, prefer_ipv6=False):
""" Resolve a host (and optional port) to an IPv4 or IPv6
address. Create a socket. Bind to it if listen is set,
otherwise connect to it. Return the socket.
"""
flags = 0
if host == '':
host = None
if connect and not port:
raise Exception("Connect mode requires a port")
if not connect:
flags = flags | socket.AI_PASSIVE
addrs = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM,
socket.IPPROTO_TCP, flags)
if not addrs:
raise Exception("Could resolve host '%s'" % host)
addrs.sort(key=lambda x: x[0])
if prefer_ipv6:
addrs.reverse()
sock = socket.socket(addrs[0][0], addrs[0][1])
if connect:
sock.connect(addrs[0][4])
else:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(addrs[0][4])
sock.listen(100)
return sock

@staticmethod
def daemonize(keepfd=None, chdir='/'):
os.umask(0)
if chdir:
os.chdir(chdir)
else:
os.chdir('/')
os.setgid(os.getgid()) # relinquish elevations
os.setuid(os.getuid()) # relinquish elevations

# Double fork to daemonize
if os.fork() > 0: os._exit(0) # Parent exits
os.setsid() # Obtain new process group
if os.fork() > 0: os._exit(0) # Parent exits

# Signal handling
def terminate(a,b): os._exit(0)
signal.signal(signal.SIGTERM, terminate)
signal.signal(signal.SIGINT, signal.SIG_IGN)

# Close open files
maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY: maxfd = 256
for fd in reversed(range(maxfd)):
try:
if fd != keepfd:
os.close(fd)
except OSError:
_, exc, _ = sys.exc_info()
if exc.errno != errno.EBADF: raise

# Redirect I/O to /dev/null
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno())
os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())

@staticmethod
def unmask(buf, f):
pstart = f['hlen'] + 4
pend = pstart + f['length']
if numpy:
b = c = s2b('')
if f['length'] >= 4:
mask = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=f['hlen'], count=1)
data = numpy.frombuffer(buf, dtype=numpy.dtype('<u4'),
offset=pstart, count=int(f['length'] / 4))
#b = numpy.bitwise_xor(data, mask).data
b = numpy.bitwise_xor(data, mask).tostring()

if f['length'] % 4:
#print("Partial unmask")
mask = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=f['hlen'], count=(f['length'] % 4))
data = numpy.frombuffer(buf, dtype=numpy.dtype('B'),
offset=pend - (f['length'] % 4),
count=(f['length'] % 4))
c = numpy.bitwise_xor(data, mask).tostring()
return b + c
else:
# Slower fallback
data = array.array('B')
mask = s2a(f['mask'])
data.fromstring(buf[pstart:pend])
for i in range(len(data)):
data[i] ^= mask[i % 4]
return data.tostring()

@staticmethod
def encode_hybi(buf, opcode, base64=False):
""" Encode a HyBi style WebSocket frame.
Optional opcode:
0x0 - continuation
0x1 - text frame (base64 encode buf)
0x2 - binary frame (use raw buf)
0x8 - connection close
0x9 - ping
0xA - pong
"""
if base64:
buf = b64encode(buf)

b1 = 0x80 | (opcode & 0x0f) # FIN + opcode
payload_len = len(buf)
if payload_len <= 125:
header = pack('>BB', b1, payload_len)
elif payload_len > 125 and payload_len < 65536:
header = pack('>BBH', b1, 126, payload_len)
elif payload_len >= 65536:
header = pack('>BBQ', b1, 127, payload_len)

#print("Encoded: %s" % repr(header + buf))

return header + buf, len(header), 0

@staticmethod
def decode_hybi(buf, base64=False):
""" Decode HyBi style WebSocket packets.
Returns:
{'fin' : 0_or_1,
'opcode' : number,
'mask' : 32_bit_number,
'hlen' : header_bytes_number,
'length' : payload_bytes_number,
'payload' : decoded_buffer,
'left' : bytes_left_number,
'close_code' : number,
'close_reason' : string}
"""

f = {'fin' : 0,
'opcode' : 0,
'mask' : 0,
'hlen' : 2,
'length' : 0,
'payload' : None,
'left' : 0,
'close_code' : None,
'close_reason' : None}

blen = len(buf)
f['left'] = blen

if blen < f['hlen']:
return f # Incomplete frame header

b1, b2 = unpack_from(">BB", buf)
f['opcode'] = b1 & 0x0f
f['fin'] = (b1 & 0x80) >> 7
has_mask = (b2 & 0x80) >> 7

f['length'] = b2 & 0x7f

if f['length'] == 126:
f['hlen'] = 4
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxH', buf)
elif f['length'] == 127:
f['hlen'] = 10
if blen < f['hlen']:
return f # Incomplete frame header
(f['length'],) = unpack_from('>xxQ', buf)

full_len = f['hlen'] + has_mask * 4 + f['length']

if blen < full_len: # Incomplete frame
return f # Incomplete frame header

# Number of bytes that are part of the next frame(s)
f['left'] = blen - full_len

# Process 1 frame
if has_mask:
# unmask payload
f['mask'] = buf[f['hlen']:f['hlen']+4]
f['payload'] = WebSocketServer.unmask(buf, f)
else:
print("Unmasked frame: %s" % repr(buf))
f['payload'] = buf[(f['hlen'] + has_mask * 4):full_len]

if base64 and f['opcode'] in [1, 2]:
try:
f['payload'] = b64decode(f['payload'])
except:
print("Exception while b64decoding buffer: %s" %
repr(buf))
raise

if f['opcode'] == 0x08:
if f['length'] >= 2:
f['close_code'] = unpack_from(">H", f['payload'])
if f['length'] > 3:
f['close_reason'] = f['payload'][2:]

return f

@staticmethod
def OLD_encode_hixie(buf):
return s2b("\x00" + b2s(b64encode(buf)) + "\xff"), 1, 1

@staticmethod
def encode_hixie(buf):
return "\x00" + buf + "\xff", 1, 1

@staticmethod
def decode_hixie(buf):
end = buf.find(s2b('\xff'))

# return {'payload': b64decode(buf[1:end]),
# 'hlen': 1,
# 'length': end - 1,
# 'left': len(buf) - (end + 1)}

return {'payload': buf[1:end],
'hlen': 1,
'length': end - 1,
'left': len(buf) - (end + 1)}

@staticmethod
def gen_md5(keys):
""" Generate hash value for WebSockets hixie-76. """
key1 = keys['Sec-WebSocket-Key1']
key2 = keys['Sec-WebSocket-Key2']
key3 = keys['key3']
spaces1 = key1.count(" ")
spaces2 = key2.count(" ")
num1 = int("".join([c for c in key1 if c.isdigit()])) / spaces1
num2 = int("".join([c for c in key2 if c.isdigit()])) / spaces2

return b2s(md5(pack('>II8s',
int(num1), int(num2), key3)).digest())

#
# WebSocketServer logging/output functions
#

def traffic(self, token="."):
""" Show traffic flow in verbose mode. """
if self.verbose and not self.daemon:
sys.stdout.write(token)
sys.stdout.flush()

def msg(self, msg):
""" Output message with handler_id prefix. """
if not self.daemon:
print("% 3d: %s" % (self.handler_id, msg))

def vmsg(self, msg):
""" Same as msg() but only if verbose. """
if self.verbose:
self.msg(msg)

#
# Main WebSocketServer methods
#
def send_frames(self, bufs=None):
""" Encode and send WebSocket frames. Any frames already
queued will be sent first. If buf is not set then only queued
frames will be sent. Returns the number of pending frames that
could not be fully sent. If returned pending frames is greater
than 0, then the caller should call again when the socket is
ready. """

tdelta = int(time.time()*1000) - self.start_time

if bufs:
for buf in bufs:
print "XXXXXX VERSION ", self.version
if self.version.startswith("hybi"):
if self.base64:
encbuf, lenhead, lentail = self.encode_hybi(
buf, opcode=1, base64=True)
else:
encbuf, lenhead, lentail = self.encode_hybi(
buf, opcode=2, base64=False)

else:
encbuf, lenhead, lentail = self.encode_hixie(buf)

if self.rec:
self.rec.write("%s,\n" %
repr("{%s{" % tdelta
+ encbuf[lenhead:-lentail]))

self.send_parts.append(encbuf)

while self.send_parts:
# Send pending frames
buf = self.send_parts.pop(0)

print "YYYYYYY Sending this to client ", buf
for l in buf:
print "%2x" % (ord(l))

sent = self.client.send(buf)

if sent == len(buf):
self.traffic("<")
else:
self.traffic("<.")
self.send_parts.insert(0, buf[sent:])
break

return len(self.send_parts)

def recv_frames(self):
""" Receive and decode WebSocket frames.

Returns:
(bufs_list, closed_string)
"""

closed = False
bufs = []
tdelta = int(time.time()*1000) - self.start_time

buf = self.client.recv(self.buffer_size)
if len(buf) == 0:
closed = "Client closed abruptly"
return bufs, closed

if self.recv_part:
# Add partially received frames to current read buffer
buf = self.recv_part + buf
self.recv_part = None

while buf:
if self.version.startswith("hybi"):

frame = self.decode_hybi(buf, base64=self.base64)
#print("Received buf: %s, frame: %s" % (repr(buf), frame))

if frame['payload'] == None:
# Incomplete/partial frame
self.traffic("}.")
if frame['left'] > 0:
self.recv_part = buf[-frame['left']:]
break
else:
if frame['opcode'] == 0x8: # connection close
closed = "Client closed, reason: %s - %s" % (
frame['close_code'],
frame['close_reason'])
break

else:
if buf[0:2] == s2b('\xff\x00'):
closed = "Client sent orderly close frame"
break

elif buf[0:2] == s2b('\x00\xff'):
buf = buf[2:]
continue # No-op

elif buf.count(s2b('\xff')) == 0:
# Partial frame
self.traffic("}.")
self.recv_part = buf
break

frame = self.decode_hixie(buf)

self.traffic("}")

if self.rec:
start = frame['hlen']
end = frame['hlen'] + frame['length']
self.rec.write("%s,\n" %
repr("}%s}" % tdelta + buf[start:end]))


bufs.append(frame['payload'])

if frame['left']:
buf = buf[-frame['left']:]
else:
buf = ''

return bufs, closed

def send_close(self, code=None, reason=''):
""" Send a WebSocket orderly close frame. """

if self.version.startswith("hybi"):
msg = s2b('')
if code != None:
msg = pack(">H%ds" % (len(reason)), code)

buf, h, t = self.encode_hybi(msg, opcode=0x08, base64=False)
self.client.send(buf)

elif self.version == "hixie-76":
buf = s2b('\xff\x00')
self.client.send(buf)

# No orderly close for 75

def do_handshake(self, sock, address):
"""
do_handshake does the following:
- Peek at the first few bytes from the socket.
- If the connection is Flash policy request then answer it,
close the socket and return.
- If the connection is an HTTPS/SSL/TLS connection then SSL
wrap the socket.
- Read from the (possibly wrapped) socket.
- If we have received a HTTP GET request and the webserver
functionality is enabled, answer it, close the socket and
return.
- Assume we have a WebSockets connection, parse the client
handshake data.
- Send a WebSockets handshake server response.
- Return the socket for this WebSocket client.
"""

stype = ""

ready = select.select([sock], [], [], 3)[0]
if not ready:
raise self.EClose("ignoring socket not ready")
# Peek, but do not read the data so that we have a opportunity
# to SSL wrap the socket first
handshake = sock.recv(1024, socket.MSG_PEEK)
#self.msg("Handshake [%s]" % handshake)

if handshake == "":
raise self.EClose("ignoring empty handshake")

elif handshake.startswith(s2b("<policy-file-request/>")):
# Answer Flash policy request
handshake = sock.recv(1024)
sock.send(s2b(self.policy_response))
raise self.EClose("Sending flash policy response")

elif handshake[0] in ("\x16", "\x80", 22, 128):
# SSL wrap the connection
if not ssl:
raise self.EClose("SSL connection but no 'ssl' module")
if not os.path.exists(self.cert):
raise self.EClose("SSL connection but '%s' not found"
% self.cert)
retsock = None
try:
retsock = ssl.wrap_socket(
sock,
server_side=True,
certfile=self.cert,
keyfile=self.key)
except ssl.SSLError:
_, x, _ = sys.exc_info()
if x.args[0] == ssl.SSL_ERROR_EOF:
raise self.EClose(x.args[1])
else:
raise

scheme = "wss"
stype = "SSL/TLS (wss://)"

elif self.ssl_only:
raise self.EClose("non-SSL connection received but disallowed")

else:
retsock = sock
scheme = "ws"
stype = "Plain non-SSL (ws://)"

wsh = WSRequestHandler(retsock, address, not self.web)
if wsh.last_code == 101:
# Continue on to handle WebSocket upgrade
pass
elif wsh.last_code == 405:
raise self.EClose("Normal web request received but disallowed")
elif wsh.last_code < 200 or wsh.last_code >= 300:
raise self.EClose(wsh.last_message)
elif self.verbose:
raise self.EClose(wsh.last_message)
else:
raise self.EClose("")

h = self.headers = wsh.headers
path = self.path = wsh.path

prot = 'WebSocket-Protocol'
protocols = h.get('Sec-'+prot, h.get(prot, '')).split(',')

ver = h.get('Sec-WebSocket-Version')
if ver:
# HyBi/IETF version of the protocol

# HyBi-07 report version 7
# HyBi-08 - HyBi-12 report version 8
# HyBi-13 reports version 13
if ver in ['7', '8', '13']:
self.version = "hybi-%02d" % int(ver)
else:
raise self.EClose('Unsupported protocol version %s' % ver)

key = h['Sec-WebSocket-Key']

# Choose binary if client supports it
if 'binary' in protocols:
self.base64 = False
elif 'base64' in protocols:
self.base64 = True
else:
#raise self.EClose("Client must support 'binary' or 'base64' protocol")
print "XXX SHIT!!! client does not support binary or base64 WTF???"
self.base64 = False

# Generate the hash value for the accept header
accept = b64encode(sha1(s2b(key + self.GUID)).digest())

response = self.server_handshake_hybi % b2s(accept)
if self.base64:
response += "Sec-WebSocket-Protocol: base64\r\n"
else:
response += "Sec-WebSocket-Protocol: binary\r\n"
response += "\r\n"

else:
# Hixie version of the protocol (75 or 76)

if h.get('key3'):
trailer = self.gen_md5(h)
pre = "Sec-"
self.version = "hixie-76"
else:
trailer = ""
pre = ""
self.version = "hixie-75"

# We only support base64 in Hixie era
self.base64 = True

response = self.server_handshake_hixie % (pre,
h['Origin'], pre, scheme, h['Host'], path)

if 'base64' in protocols:
response += "%sWebSocket-Protocol: base64\r\n" % pre
else:
self.msg("Warning: client does not report 'base64' protocol support")
response += "\r\n" + trailer

self.msg("%s: %s WebSocket connection" % (address[0], stype))
self.msg("%s: Version %s, base64: '%s'" % (address[0],
self.version, self.base64))
if self.path != '/':
self.msg("%s: Path: '%s'" % (address[0], self.path))


# Send server WebSockets handshake response
#self.msg("sending response [%s]" % response)
retsock.send(s2b(response))

# Return the WebSockets socket which may be SSL wrapped
return retsock


#
# Events that can/should be overridden in sub-classes
#
def started(self):
""" Called after WebSockets startup """
self.vmsg("WebSockets server started")

def poll(self):
""" Run periodically while waiting for connections. """
#self.vmsg("Running poll()")
pass

def fallback_SIGCHLD(self, sig, stack):
# Reap zombies when using os.fork() (python 2.4)
self.vmsg("Got SIGCHLD, reaping zombies")
try:
result = os.waitpid(-1, os.WNOHANG)
while result[0]:
self.vmsg("Reaped child process %s" % result[0])
result = os.waitpid(-1, os.WNOHANG)
except (OSError):
pass

def do_SIGINT(self, sig, stack):
self.msg("Got SIGINT, exiting")
sys.exit(0)

def top_new_client(self, startsock, address):
""" Do something with a WebSockets client connection. """
# Initialize per client settings
self.send_parts = []
self.recv_part = None
self.base64 = False
self.rec = None
self.start_time = int(time.time()*1000)

# handler process
try:
try:
self.client = self.do_handshake(startsock, address)

if self.record:
# Record raw frame data as JavaScript array
fname = "%s.%s" % (self.record,
self.handler_id)
self.msg("opening record file: %s" % fname)
self.rec = open(fname, 'w+')
self.rec.write("var VNC_frame_data = [\n")

self.ws_connection = True
self.new_client()
except self.EClose:
_, exc, _ = sys.exc_info()
# Connection was not a WebSockets connection
if exc.args[0]:
self.msg("%s: %s" % (address[0], exc.args[0]))
except Exception:
_, exc, _ = sys.exc_info()
self.msg("handler exception: %s" % str(exc))
if self.verbose:
self.msg(traceback.format_exc())
finally:
if self.rec:
self.rec.write("'EOF']\n")
self.rec.close()

if self.client and self.client != startsock:
self.client.close()

def new_client(self):
""" Do something with a WebSockets client connection. """
raise("WebSocketServer.new_client() must be overloaded")

def start_server(self):
"""
Daemonize if requested. Listen for for connections. Run
do_handshake() method for each connection. If the connection
is a WebSockets client then call new_client() method (which must
be overridden) for each new client connection.
"""
lsock = self.socket(self.listen_host, self.listen_port)

if self.daemon:
self.daemonize(keepfd=lsock.fileno(), chdir=self.web)

self.started() # Some things need to happen after daemonizing

# Allow override of SIGINT
signal.signal(signal.SIGINT, self.do_SIGINT)
if not multiprocessing:
# os.fork() (python 2.4) child reaper
signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)

while True:
try:
try:
self.client = None
startsock = None
pid = err = 0

time_elapsed = time.time() - self.launch_time
if self.timeout and time_elapsed > self.timeout:
self.msg('listener exit due to --timeout %s'
% self.timeout)
break

try:
self.poll()

ready = select.select([lsock], [], [], 1)[0]
if lsock in ready:
startsock, address = lsock.accept()
else:
continue
except Exception:
_, exc, _ = sys.exc_info()
if hasattr(exc, 'errno'):
err = exc.errno
elif hasattr(exc, 'args'):
err = exc.args[0]
else:
err = exc[0]
if err == errno.EINTR:
self.vmsg("Ignoring interrupted syscall")
continue
else:
raise

if self.run_once:
# Run in same process if run_once
self.top_new_client(startsock, address)
if self.ws_connection :
self.msg('%s: exiting due to --run-once'
% address[0])
break
elif multiprocessing:
self.vmsg('%s: new handler Process' % address[0])
p = multiprocessing.Process(
target=self.top_new_client,
args=(startsock, address))
p.start()
# child will not return
else:
# python 2.4
self.vmsg('%s: forking handler' % address[0])
pid = os.fork()
if pid == 0:
# child handler process
self.top_new_client(startsock, address)
break # child process exits

# parent process
self.handler_id += 1

except KeyboardInterrupt:
_, exc, _ = sys.exc_info()
print("In KeyboardInterrupt")
pass
except SystemExit:
_, exc, _ = sys.exc_info()
print("In SystemExit")
break
except Exception:
_, exc, _ = sys.exc_info()
self.msg("handler exception: %s" % str(exc))
if self.verbose:
self.msg(traceback.format_exc())

finally:
if startsock:
startsock.close()


# HTTP handler with WebSocket upgrade support
class WSRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, req, addr, only_upgrade=False):
self.only_upgrade = only_upgrade # only allow upgrades
SimpleHTTPRequestHandler.__init__(self, req, addr, object())

def do_GET(self):
if (self.headers.get('upgrade') and
self.headers.get('upgrade').lower() == 'websocket'):

if (self.headers.get('sec-websocket-key1') or
self.headers.get('websocket-key1')):
# For Hixie-76 read out the key hash
self.headers.__setitem__('key3', self.rfile.read(8))

# Just indicate that an WebSocket upgrade is needed
self.last_code = 101
self.last_message = "101 Switching Protocols"
elif self.only_upgrade:
# Normal web request responses are disabled
self.last_code = 405
self.last_message = "405 Method Not Allowed"
else:
SimpleHTTPRequestHandler.do_GET(self)

def send_response(self, code, message=None):
# Save the status code
self.last_code = code
SimpleHTTPRequestHandler.send_response(self, code, message)

def log_message(self, f, *args):
# Save instead of printing
self.last_message = f % args






Again, I apologize for the hacky code (it really is a shame as websocket.py is written so well) but I left the affected area's logging in to enunciate the differences. It will also dump out the request so you can see the differences in the headers. In the next post I will incorporate the changes better so they are backward compatibile with the original code.

Anyway, this demo should work with Safari and Chrome on a MacBook Pro. Your mileage may vary.

Websockets Revisited - Part 1

So in my last post I naively stated that websockets were easy. What a difference a few days makes. It turns out I can only find two browsers for my MacBook Pro that currently support websockets and the code on both the client and the server must be different for each. As I mentioned to my friend Scott the other day, I hope this doesn't turn out like Ajax did where you have all sorts of cross browser compatibility issues because so far that is exactly what is happening.

A bit of background first. The websocket spec RFC 6455 (http://tools.ietf.org/html/draft-ietf-hybi-thewebsocketprotocol-17) covers the websocket protocol. The first version "draft-ietf-hybi-thewebsocketprotocol-00" which is what my version of Safari (5.1.3) supports, framed its data using a 0x00 byte to start the frame and a 0xff byte to end the frame. It had two security keys in the handshake header and the response to these keys was calculated by concatenating the numbers from the first key, and dividing by the number of spaces. This is then repeated for the second key. The two resulting numbers are concatenated with each other, and with the last 8 bytes after the fields. The final result is an MD5 sum of the concatenated string (this is from the websocket wiki page).

Obviously, the server will need to do some work to provide a proper handshake to the browser. Specifically, it must interpret the header and indicate to the client it has done so by providing a return value derived from the two security keys in the header (ugh!).

Now, while the code in my last post indeed works with Safari, it does not work with Chrome (version 17.0.963.56). That's because Chrome uses the newer version of the websocket protocol; "draft-ietf-hybi-thewebsocketprotocol-06". This version was created because of man-in-the-middle attacks and other security vulnerabilities so now we have a completely different, non-backward compatible version of websockets floating around out there. In this second version of the specification, there is not two but one security key and its value is calculated differently. To make matters worse, there is a bit-oriented header and a 4 byte xor mask passed with each frame and the actual frame data is 'masked' (uber ugh!).

Of course, this is just the server side. My experience on the client side is that Safari passes websocket data back to the Javascript application as a string but Chrome passes it back as a blob. Being new to blobs (and HTML5 files) it took me a while to figure out how to deal with the blob data. The thing that concerns me is not the different ways the data is presented to the application (blob vs string) but that it is different at all. Don't these guys who write browsers communicate? This is why I think Javascript tool kits like Dojo and jQuery are safe at least for the near future.

Anyway, as you can see from the above descriptions of header parsing and handshaking, this is a lot more code than I feel like writing just to get a simple websocket demo working, so I started looking for websocket servers. Now, while I can code in Assembly, C, C++, Java, PERL, PHP, Python, Ruby and Javascript (as well as Pascal and Fortran but we won't go there) I would prefer the server to be written in Python since that is what I am most comfortable with lately and specifically, for most of my personal projects (because I always assume my project will become so successful it will need to scale to gazillions of users) I tend to use Tornado (my favorite Python webserver).

It turns out there is probably a websocket server written in any language you like (including erlang :-) and so I started reading up on them. From what I read, the Tornado websocket support is buggy (but I could be wrong) so I kept away from it; for now. I do not like big ass frameworks and many of the servers I looked at were add-ons to existing systems. I wanted a simple piece of code my simple mind could comprehend so heaven forbid, if it didn't work I could possibly debug it (this turned out to be prophetic to say the least) and as a result I settled on "websocket.py".

When I brought up "websocket.py" it did not work with my version of Safari or Chrome. Hell, I even got Safari working with a small piece of code 2 days ago, so I figured I would roll up my shirt sleeves and hack around "websocket.py" and make it work. I will go over the code mods and the new code in the next blog - "Websockets Revisited - Part 2". It concerns me though because I am sure whoever wrote "websocket.py" tested it on something and it worked so my mods have probably broken something that was working, but, I also know that what was not working with my two browsers is now working so I'll try to relay this information to the original author of "websocket.py".





Thursday, February 16, 2012

Websockets and Python

I had recently built a push notification framework using AJAX and a FireFox quirk described here (http://ajaxpatterns.org/HTTP_Streaming) with a Tornado webserver to keep the socket open ala long poll, however, this only works on Firefox and is really a kludgy way to do things so I recently bit the bullet and dove into HTML5 Websockets.

Even though Websockets are not supported on Firefox yet (and probably not IE, I don't know and after working with JavaScript over the past 10 years I really don't care about IE) I think moving forward this is the way to go. What I discovered is using Websockets is really really easy. I copied some basic code from StackOverflow (http://stackoverflow.com/questions/4372657/websocket-handshake-problem-using-python-server) modified it to be a little easier on the eyes and included a universal timer tick so all users can see they are in sync in real-time, and viola - we have the following 2 really small code snippets.

To test this app on your environment you will need a separate web server that can serve out the .html page to a browser on the web. I use APACHE but any web server will do including the one built into python which can be run like this ...

sudo python -m SimpleHTTPServer 80

If you want it to handle port 80 (http), or ...

python -m SimpleHTTPServer 8888

If you want it to run on a simple user port.

Once you have that working so you can point a browser to your client.html page and run it, you can start your python server (the code below) like this ...

python server.py

This assumes you name the client file 'client.html' and the server file 'server.py'. Note this is not an optimal solution as I am not sure how well this approach will scale (python threads are not real fast). I usually use Tornado for my personal projects that have to scale but I read that Tornado is not working well with Websockets yet. If anyone knows otherwise, please let me know.

Anyway, assuming all is well, this will show a page with a universal timer tick which should be in sync across all connected browsers/users and a message area where any messages sent from anyone currently connected are displayed.

Don't forget to modify your code to use your IP address. The IP address in the code points to a mini dev server I have which may or may not be active when you try out the code. Also, I have only tested it on Safari. It may or may not work on other browsers.

Now that I have this code working I can use it to build my HTML5 2D Game Engine framework which I will release in a future post.

Now on to the actual code ...


First, the client code (JavaScript in HTML) ...
<!DOCTYPE html>
<html lang="en">
<head>
<title>Test</title>
<script type="application/javascript">
var ws;

function init() {
var servermsg = document.getElementById("servermsg");

ws = new WebSocket("ws://174.143.214.70:9876/");
ws.onopen = function(){
servermsg.innerHTML = servermsg.innerHTML + "<br>Server connected";
};
ws.onmessage = function(e){
da = e.data.split(":");
cmd = da[0];
data = da[1];
// servermsg.innerHTML = servermsg.innerHTML + "<br>Recieved data: " + e.data;
if (cmd == "tick"){
tic.innerHTML = data;
}else{
servermsg.innerHTML = servermsg.innerHTML + "<br>" + data;
}
};
ws.onclose = function(){
console.log("Server disconnected");
servermsg.innerHTML = servermsg.innerHTML + "<br>Disconnected";
};
}
function postmsg(){
var text = document.getElementById("message").value;
ws.send(text);
servermsg.innerHTML = servermsg.innerHTML + "<br>Sent: " + text;
return false;
}
</script>
</head>
<body onload="init();">
<span id="tic">0</span><br/>
<input type="text" name="message" value="" id="message">
<button onClick="postmsg();">Send</button>
<div id="servermsg" style="overflow:auto; height:400px; width:400px; border:1px solid black"><h1>Message log:</h1></div>
</body>
</html>

And next, the server code (written in Python as the title implies) ...


#!/usr/bin/env python

import socket
import threading
import struct
import hashlib
import binascii
import time

PORT = 9876

# evil globals
requestList = []
exitFlag = 0
tic = 0
lock = threading.Lock()
clients = []

# timer tick
class ThreadClass(threading.Thread):
def run(self):
global exitFlag
global tic
global requestList
global lock
global clients
invalidConnections = []
while exitFlag == 0:
data = "tick:%s" % (tic)
data = chr(0) + data + chr(255)
print data
# Broadcast tic data to all clients
lock.acquire()
[conn.send(data) for conn in clients]
lock.release()
tic = tic + 1
time.sleep(1)

print "\nNotice - exiting thread !"


def create_handshake_resp(handshake):
final_line = ""
lines = handshake.splitlines()
for line in lines:
parts = line.partition(": ")
if parts[0] == "Sec-WebSocket-Key1":
key1 = parts[2]
elif parts[0] == "Sec-WebSocket-Key2":
key2 = parts[2]
elif parts[0] == "Host":
host = parts[2]
elif parts[0] == "Origin":
origin = parts[2]
final_line = line

spaces1 = key1.count(" ")
spaces2 = key2.count(" ")
num1 = int("".join([c for c in key1 if c.isdigit()])) / spaces1
num2 = int("".join([c for c in key2 if c.isdigit()])) / spaces2

token = hashlib.md5(struct.pack('>II8s', num1, num2, final_line)).digest()

return (
"HTTP/1.1 101 WebSocket Protocol Handshake\r\n"
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Origin: %s\r\n"
"Sec-WebSocket-Location: ws://%s/\r\n"
"\r\n"
"%s") % (
origin, host, token)


def handle(s, addr):
global exitFlag
global lock
global clients
# acknowledge the connection
data = s.recv(1024)
s.send(create_handshake_resp(data))

while exitFlag == 0:
print "Waiting for data from", s, addr
data = s.recv(1024)
print "Done"
if not data:
print "No data"
break

print "%s Bytes of Data recvd from %s --->%s" % (len(data), addr, data)
data = data[1:-1]
data = chr(0) + "msg:" + data + chr(255)

# Broadcast received data to all clients
lock.acquire()
[conn.send(data) for conn in clients]
lock.release()

print 'Client closed:', addr
lock.acquire()
clients.remove(s)
lock.release()
s.close()

def start_server():
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(('', PORT))
s.listen(1)
while 1:
conn, addr = s.accept()
print 'Connected by', addr
clients.append(conn)
threading.Thread(target = handle, args = (conn, addr)).start()

t = ThreadClass()
try:
t.start()
start_server()
except (KeyboardInterrupt, SystemExit):
print "\nNotice - CTL+C Received, Shutting Down, 1 Sec ..."
exitFlag = 1
time.sleep(2)
# note - should probably use join here
print "\nAll Clean, Exiting Server."





That's it. Its just that simple.