I have been playing around with market/order streaming and have added an initial commit to my library:
https://github.com/liampauling/betfairlightweight
The ability to connect to every market with minimal cpu usage is awesome, nice work Betfair!
Here is the basic Stream class I am using, would welcome any criticism / advice from anyone else experimenting.
https://github.com/liampauling/betfairlightweight
The ability to connect to every market with minimal cpu usage is awesome, nice work Betfair!
Here is the basic Stream class I am using, would welcome any criticism / advice from anyone else experimenting.
Code:
class BetfairStream:
"""Stream holder, socket connects to betfair,
pushes any received data to listener
"""
__host = 'stream-api.betfair.com'
__port = 443
__CRLF = '\r\n'
__encoding = 'utf-8'
def __init__(self, unique_id, listener=None, timeout=5, buffer_size=1024):
self.unique_id = unique_id
self.listener = listener if listener else StreamListener()
self.timeout = timeout
self.buffer_size = buffer_size
self.socket = None
self.running = False
def start(self, async=False):
"""Creates socket and starts read loop.
:param async: If True new thread is started
"""
self.running = True
self.socket = self._create_socket()
if async:
threading.Thread(name='BetfairStream', target=self._read_loop, daemon=True).start()
else:
self._read_loop()
def stop(self):
"""Closes socket and stops read loop
"""
self.running = False
self.socket.close()
logging.info('[Connect: %s]: Socket closed' % self.unique_id)
def authenticate(self, app_key, session_token, unique_id=1):
"""Authentication request.
:param app_key:
:param session_token:
:param unique_id: If not supplied 1 is used.
"""
message = {'op': 'authentication',
'id': unique_id,
'appKey': app_key,
'session': session_token}
self._send(message)
def heartbeat(self, unique_id=None):
"""Heartbeat request to keep session alive.
:param unique_id: self.unique_id used if not supplied.
"""
message = {'op': 'heartbeat',
'id': self.unique_id if not unique_id else unique_id}
self._send(message)
def subscribe_to_markets(self, market_filter=None, market_data_filter=None, unique_id=None):
"""Market subscription request.
:param market_filter: Market filter.
:param market_data_filter: Market data filter.
:param unique_id: self.unique_id used if not supplied.
"""
message = {'op': 'marketSubscription',
'id': self.unique_id if not unique_id else unique_id,
'marketFilter': market_filter,
'marketDataFilter': market_data_filter}
self._send(message)
def subscribe_to_orders(self, unique_id=None):
"""Order subscription request.
:param unique_id: self.unique_id used if not supplied.
"""
message = {'op': 'orderSubscription',
'id': self.unique_id if not unique_id else unique_id}
self._send(message)
def _create_socket(self):
"""Creates ssl socket and connects to stream api.
:return: Connected socket.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = ssl.wrap_socket(s)
s.connect((self.__host, self.__port))
return s
def _read_loop(self):
"""Read loop, splits by CRLF and pushes received data
to _data.
"""
while self.running:
try:
received_data_raw = self._receive_all()
received_data_split = received_data_raw.split(self.__CRLF)
for received_data in received_data_split:
if received_data:
self._data(received_data)
except OSError:
break
logging.warning('_read_loop ended: %s' % self.unique_id)
def _receive_all(self):
"""Whilst socket is running receives data from socket,
till CRLF is detected.
:return: Decoded data.
"""
(data, part) = ('', '')
while self.running and part[-2:] != bytes(self.__CRLF, encoding=self.__encoding):
part = self.socket.recv(self.buffer_size)
if part:
data += part.decode(self.__encoding)
return data
def _data(self, received_data):
"""Sends data to listener, if False is returned; socket
is closed.
:param received_data: Decoded data received from socket.
"""
if self.listener.on_data(received_data) is False:
self.stop()
def _send(self, message):
"""Adds CRLF and sends message to Betfair.
:param message: Data to be sent to Betfair.
"""
message_dumped = json.dumps(message) + self.__CRLF
self.socket.send(message_dumped.encode())


Comment