diff --git a/ftx/wsapi.py b/ftx/wsapi.py index a715b24..486ead5 100644 --- a/ftx/wsapi.py +++ b/ftx/wsapi.py @@ -6,8 +6,7 @@ from json import JSONDecodeError from typing import Optional -import websockets -from websockets.legacy.client import WebSocketClientProtocol +from websocket import WebSocket from ftx.fifo import AsyncFifoQueue @@ -18,7 +17,7 @@ class FtxWebSocketClient: See: https://docs.ftx.com/#websocket-api """ - _ws: Optional[WebSocketClientProtocol] + _ws: Optional[WebSocket] def __init__(self, api_key: Optional[str] = None, @@ -46,12 +45,25 @@ def __init__(self, self.socket_url = socket_url self._queue = AsyncFifoQueue(maxsize=queue_size) + async def _connect(self, url): + await asyncio.get_event_loop().run_in_executor(None, lambda: self._ws.connect(url)) + + async def _close(self): + await asyncio.get_event_loop().run_in_executor(None, self._ws.close) + + async def _send(self, data): + await asyncio.get_event_loop().run_in_executor(None, lambda: self._ws.send(data)) + + async def _recv(self): + return await asyncio.get_event_loop().run_in_executor(None, self._ws.recv) + async def connect(self): """ Connect to the websocket :return: """ - self._ws = await websockets.connect(self.socket_url) + self._ws = WebSocket() + await self._connect(self.socket_url) self._log('websocket connected') asyncio.create_task(self._loop_fn()) asyncio.create_task(self._ping_loop_fn()) @@ -61,8 +73,8 @@ async def disconnect(self): Disconnects the websocket if it's open :return: """ - if self._ws and self._ws.open: - await self._ws.close() + if self._ws and self._ws.connected: + await self._close() self._ws = None @property @@ -70,7 +82,7 @@ def connected(self): """ True if the websocket is connected and open """ - return self._ws and self._ws.open + return self._ws and self._ws.connected @property def messages_dropped(self): @@ -103,7 +115,7 @@ async def send_message(self, msg): :param msg: the message """ self._log('->', msg) - await self._ws.send(json.dumps(msg)) + await self._send(json.dumps(msg)) async def _ping_loop_fn(self): """ @@ -123,7 +135,7 @@ async def _loop_fn(self): """ while self.connected: try: - msg = await self._ws.recv() + msg = await self._recv() try: self._on_message(json.loads(msg)) except JSONDecodeError: diff --git a/requirements.txt b/requirements.txt index 2ffdc5c..73cc471 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ ciso8601 requests -websockets \ No newline at end of file +websocket-client \ No newline at end of file diff --git a/setup.py b/setup.py index 81aeaed..6194dc5 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ author_email='thomgabriel@protonmail.com', url='https://github.com/quan-digital/ftx/tree/v1.2', download_url='https://github.com/quan-digital/ftx/archive/v1.2.tar.gz', - install_requires=['requests', 'ciso8601'], + install_requires=['requests', 'ciso8601', 'websocket-client'], packages=find_packages(), keywords=[ 'ftx', 'bitcoin', 'crypto-api', 'api-connector', 'exchange-api',