pygo/lib/gtpsocket.py

138 lines
3.7 KiB
Python

# A class for communicating in Go Text Protocol
#
# An already-connected socket should be passed in, then this class
# can be used to handle some aspects of GTP simply
# Commands that any user of this class *must* support...
# quit
# boardsize
# clear_board
# komi
# play
# genmove
class GTPSocket:
# By default the list only has commands we can handle internally in this class.
# Anything that uses this class should append any commands it can handle to this
# list. A common pattern would be:
#
# dispatcher = {'command1': function_1, 'command_2': function_2, 'command_3': function_3}
# gtp_socket = GTPSocket(socket)
# GTPSocket.known_cmds = GTPSocket.known_cmds & set(dispatcher.keys())
# gtp = gtp_socket.get()
# if gtp.type == 'command':
# dispatcher[gtp.command](gtp)
known_cmds = set(['protocol_version', 'name', 'version', 'known_command', 'list_commands'])
def __init__(self, socket):
self.socket = None
self.last_id = None
def get(self):
msg = None
while msg is None:
try:
msg = self.socket.recv(4096)
if msg[0] == '?':
print "Error: GTP response: " + msg
return self._msg_to_response(msg)
elif msg[0] == '=':
return self._msg_to_response(msg)
elif not _validate_gtp(msg):
print 'Error: Incoming data was not a valid GTP message'
msg = None
except:
pass
gtp = self._msg_to_gtp(msg)
# Some gtp commands should be handled internally
if gtp.command == 'protocol_version':
self.send_response('2', gtp.id)
return None
elif gtp.command == 'name':
self.send_response('pygo', gtp.id)
return None
elif gtp.command == 'version':
self.send_response('', gtp.id)
return None
elif gtp.command == 'known_command':
if gtp.arguments[0] in GTPSocket.known_cmds:
resp = 'true'
else:
resp = 'false'
self.send_response(resp, gtp.id)
return None
elif gtp.command == 'list_commands':
self.send_response(''.join(GTPSocket.known_cmds, '\n'), gtp.id)
return None
else:
return gtp
def send(self, msg):
try:
msg = str(self.last_id + 1) + ' ' + msg + "\n"
if _validate_gtp(msg):
self.last_id += 1
self.socket.send(msg)
return True
else:
print 'Error: Outgoing data was not a valid GTP message'
return False
except:
return False
def send_response(self, msg, resp_id=None, is_error=False):
try:
to_send = ''
if is_error:
to_send += '?'
else:
to_send += '='
if resp_id is not None:
to_send += str(resp_id)
to_send += msg + "\n\n"
self.socket.send(to_send)
return True
except:
return False
def _msg_to_gtp(self, msg):
gtp = Object()
data = msg.split(' ')
gtp.type = 'message'
gtp.id = int(data[0])
gtp.command = data[1]
if len(data) > 2:
gtp.arguments = data[2:]
return gtp
def _msg_to_response(self, msg):
resp = Object()
resp.type = 'response'
# For now, we *require* our messages to have an id.
# This violates the protocol, but we'll deal with the
# more complicated case later.
def _validate_gtp(msg):
return re.match(r'\d+?\s+\w+?', msg)