Streaming Example

Collapse
X
 
  • Time
  • Show
Clear All
new posts
  • LiamP
    Junior Member
    • Oct 2015
    • 284

    #1

    Streaming Example

    I have been playing around with market/order streaming and have added an initial commit to my library:

    https://github.com/liampauling/betfairlightweight

    The ability to connect to every market with minimal cpu usage is awesome, nice work Betfair!

    Here is the basic Stream class I am using, would welcome any criticism / advice from anyone else experimenting.

    Code:
    class BetfairStream:
        """Stream holder, socket connects to betfair,
        pushes any received data to listener
        """
    
        __host = 'stream-api.betfair.com'
        __port = 443
        __CRLF = '\r\n'
        __encoding = 'utf-8'
    
        def __init__(self, unique_id, listener=None, timeout=5, buffer_size=1024):
            self.unique_id = unique_id
            self.listener = listener if listener else StreamListener()
            self.timeout = timeout
            self.buffer_size = buffer_size
    
            self.socket = None
            self.running = False
    
        def start(self, async=False):
            """Creates socket and starts read loop.
    
            :param async: If True new thread is started
            """
            self.running = True
            self.socket = self._create_socket()
            if async:
                threading.Thread(name='BetfairStream', target=self._read_loop, daemon=True).start()
            else:
                self._read_loop()
    
        def stop(self):
            """Closes socket and stops read loop
            """
            self.running = False
            self.socket.close()
            logging.info('[Connect: %s]: Socket closed' % self.unique_id)
    
        def authenticate(self, app_key, session_token, unique_id=1):
            """Authentication request.
    
            :param app_key:
            :param session_token:
            :param unique_id: If not supplied 1 is used.
            """
            message = {'op': 'authentication',
                       'id': unique_id,
                       'appKey': app_key,
                       'session': session_token}
            self._send(message)
    
        def heartbeat(self, unique_id=None):
            """Heartbeat request to keep session alive.
    
            :param unique_id: self.unique_id used if not supplied.
            """
            message = {'op': 'heartbeat',
                       'id': self.unique_id if not unique_id else unique_id}
            self._send(message)
    
        def subscribe_to_markets(self, market_filter=None, market_data_filter=None, unique_id=None):
            """Market subscription request.
    
            :param market_filter: Market filter.
            :param market_data_filter: Market data filter.
            :param unique_id: self.unique_id used if not supplied.
            """
            message = {'op': 'marketSubscription',
                       'id': self.unique_id if not unique_id else unique_id,
                       'marketFilter': market_filter,
                       'marketDataFilter': market_data_filter}
            self._send(message)
    
        def subscribe_to_orders(self, unique_id=None):
            """Order subscription request.
    
            :param unique_id: self.unique_id used if not supplied.
            """
            message = {'op': 'orderSubscription',
                       'id': self.unique_id if not unique_id else unique_id}
            self._send(message)
    
        def _create_socket(self):
            """Creates ssl socket and connects to stream api.
    
            :return: Connected socket.
            """
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s = ssl.wrap_socket(s)
            s.connect((self.__host, self.__port))
            return s
    
        def _read_loop(self):
            """Read loop, splits by CRLF and pushes received data
            to _data.
            """
            while self.running:
                try:
                    received_data_raw = self._receive_all()
                    received_data_split = received_data_raw.split(self.__CRLF)
                    for received_data in received_data_split:
                        if received_data:
                            self._data(received_data)
                except OSError:
                    break
            logging.warning('_read_loop ended: %s' % self.unique_id)
    
        def _receive_all(self):
            """Whilst socket is running receives data from socket,
            till CRLF is detected.
    
            :return: Decoded data.
            """
            (data, part) = ('', '')
            while self.running and part[-2:] != bytes(self.__CRLF, encoding=self.__encoding):
                part = self.socket.recv(self.buffer_size)
                if part:
                    data += part.decode(self.__encoding)
            return data
    
        def _data(self, received_data):
            """Sends data to listener, if False is returned; socket
            is closed.
    
            :param received_data: Decoded data received from socket.
            """
            if self.listener.on_data(received_data) is False:
                self.stop()
    
        def _send(self, message):
            """Adds CRLF and sends message to Betfair.
    
            :param message: Data to be sent to Betfair.
            """
            message_dumped = json.dumps(message) + self.__CRLF
            self.socket.send(message_dumped.encode())
    Last edited by LiamP; 04-11-2016, 08:54 PM.
  • AlgoTrader
    Junior Member
    • Mar 2012
    • 243

    #2
    Great to see another Streaming API example. I have found quite convenient to use openssl to explore the protocol, it works like this:

    Code:
    #openssl s_client -connect stream-api.betfair.com:443
    ...skippled TLS stuff...
    {"op":"connection","connectionId":"002-080616101159-11444"}
    {"op":"authentication", "id":123, "appKey":"1234567"}
    {"op":"status","id":123,"statusCode":"FAILURE","errorCode":"INVALID_APP_KEY","errorMessage":"UnknownCaller","connectionClosed":true,"connectionId":"002-080616101159-11444"}
    I just copy & paste JSON from text editor and receive responses immediately. App key is invalid in the example

    It seems, the streaming API is the next big thing we have. I do not use the python but I certainly explore your code to learn more about the new API.
    Betfair Bots Made Easy

    Comment

    • StefanBelo.
      Junior Member
      • Jan 2009
      • 105

      #3
      I already implemented both api's to my app:

      https://youtu.be/u82bY5bLZPU
      betfair bot platform, bfexplorer bot sdk

      Comment

      • LiamP
        Junior Member
        • Oct 2015
        • 284

        #4
        Originally posted by AlgoTrader View Post
        Great to see another Streaming API example. I have found quite convenient to use openssl to explore the protocol, it works like this:
        That would have saved me some time, cheers!

        Just started work on order streaming, anyone else finding it pointless with no 'mo' (matched orders) data?

        Makes fill kill / stop / offset impossible

        Comment

        • AlgoTrader
          Junior Member
          • Mar 2012
          • 243

          #5
          I just read pdf, there seem to be mb and ml fields that states for matched backs and matched lays. Is it only transitions reported?

          Stream API is useless alone. There are no new markets subscriptions (catalogue). It's good start at least as prices and volumes are huge part of the whole traffic. My listMarketBook can be thin.
          Betfair Bots Made Easy

          Comment

          • LiamP
            Junior Member
            • Oct 2015
            • 284

            #6
            Ah the 'uo' status gets updated to EC (Execution Complete) when an order is matched, ignore me.

            Comment

            • aye robot
              Junior Member
              • Jun 2015
              • 12

              #7
              Can anyone point me towards a simple over-view of how streaming works? Is Betfair's streaming similar to other streams (like twitter) that there are tutorials available for?

              I need to understand the overall structure explained as if to an idiot...

              Thanks

              Comment

              • vic
                Junior Member
                • May 2009
                • 33

                #8
                Is there any c# examples out there anyone

                Comment

                • jptrader
                  Junior Member
                  • Nov 2009
                  • 82

                  #9
                  Originally posted by vic View Post
                  Is there any c# examples out there anyone
                  https://github.com/betfair/stream-api-sample-code

                  Comment

                  • LiamP
                    Junior Member
                    • Oct 2015
                    • 284

                    #10
                    Originally posted by aye robot View Post
                    Can anyone point me towards a simple over-view of how streaming works? Is Betfair's streaming similar to other streams (like twitter) that there are tutorials available for?

                    I need to understand the overall structure explained as if to an idiot...

                    Thanks
                    Aye, have you seen the docs?

                    http://docs.developer.betfair.com/do...nge+Stream+API

                    Its similar to twitter in that you create a socket, connect and then subscribe to markets/orders.

                    Comment

                    • vic
                      Junior Member
                      • May 2009
                      • 33

                      #11
                      OK, Thanks JPTRADER

                      Comment

                      • footlong
                        Junior Member
                        • Nov 2016
                        • 8

                        #12
                        Liam - thank you for posting, your library is a great starting point. I'm not experienced in socket programming, and streaming data in particular, but have managed to get your code running using your example code.

                        However, I can only seem to run your code with:
                        Code:
                        betfair_socket.start(async=True)
                        The market data then simply gets printed to my console / terminal - which although is a big step in the right direction, ultimately isn't of much use - unless there's a way to 'get hold' of that data?

                        Upon examining your code, I see that if I start with async=False, it should running with the _read_loop(), which states that it appends the incoming data to _data, which would be what I am ultimately after. However, having run:
                        Code:
                        betfair_socket.start(async=False)
                        in interactive mode, it doesn't return to the command prompt, and so times out.

                        Any help would be much appreciated!

                        Comment

                        • LiamP
                          Junior Member
                          • Oct 2015
                          • 284

                          #13
                          Hi Footlong,

                          Your not the only one who has had this issue:

                          https://github.com/liampauling/betfa...ight/issues/11

                          Async is there so that you can start your own thread to catch errors:

                          Code:
                          def error_catching_thread(betfair_socket):
                              try:
                                  betfair_socket.start(async=False)
                              except BetfairError:
                                  print('Catching error, lets restart...')
                          
                          threading.Thread(target=error_catching_thread, args=(betfair_socket,)).start()
                          
                          betfair_socket.authenticate()
                          betfair_socket.subscribe_to_markets(market_filter={'eventTypeIds':['1'], 'inPlayOnly': True, 'marketTypes':["MATCH_ODDS"]}, market_data_filter={'fields': ['EX_BEST_OFFERS']})
                          Without starting a new thread its impossible to receive updates from betfair unless you go down a concurrent/coroutine/async route but I want to keep things simple.

                          Currently coding up v0.8.0 which should cover some tests on streaming and looking to refactor/simplify the listener/streams as I am not happy with it and really want to use this in production.

                          Comment

                          • footlong
                            Junior Member
                            • Nov 2016
                            • 8

                            #14
                            Great - I'll keep an eye-out. Thanks Liam.

                            Comment

                            • LiamP
                              Junior Member
                              • Oct 2015
                              • 284

                              #15
                              I have now released v0.8, 0.8.4 to be exact, also added to pypi so you can now:

                              Code:
                              pip install betfairlightweight
                              Also now working on a simple lightweight data recorder which uses streaming:

                              https://github.com/liampauling/flumine

                              This started off as a test of streaming but thought others might find it useful, it hardly uses any CPU, 1-2% on a AWS nano instance. Still in development as I will be working on it as I try and find any bugs with betfairlightweight's streaming code but I recorded all of the weekends racing with 3 lines of code!

                              Code:
                              from flumine import Flumine, RacingRecorder
                              
                              flumine = Flumine(
                                      trading=('username', 'password'),
                                      recorder=RacingRecorder(in_play=True)
                              )
                              flumine.start()

                              Comment

                              Working...
                              X