-
Notifications
You must be signed in to change notification settings - Fork 721
Expand file tree
/
Copy pathwebsocket_client.py
More file actions
168 lines (140 loc) · 5.04 KB
/
websocket_client.py
File metadata and controls
168 lines (140 loc) · 5.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# cbpro/WebsocketClient.py
# original author: Daniel Paquin
# mongo "support" added by Drew Rice
#
#
# Template object to receive messages from the Coinbase Websocket Feed
from __future__ import print_function
import json
import base64
import hmac
import hashlib
import time
from threading import Thread
from websocket import create_connection, WebSocketConnectionClosedException
from pymongo import MongoClient
from cbpro.cbpro_auth import get_auth_headers
class WebsocketClient(object):
def __init__(self, url="wss://ws-feed.pro.coinbase.com", products=None, message_type="subscribe", mongo_collection=None,
should_print=True, auth=False, api_key="", api_secret="", api_passphrase="", channels=None):
self.url = url
self.products = products
self.channels = channels
self.type = message_type
self.stop = True
self.error = None
self.ws = None
self.thread = None
self.auth = auth
self.api_key = api_key
self.api_secret = api_secret
self.api_passphrase = api_passphrase
self.should_print = should_print
self.mongo_collection = mongo_collection
def start(self):
def _go():
self._connect()
self._listen()
self._disconnect()
self.stop = False
self.on_open()
self.thread = Thread(target=_go, name='socket-worker-thread')
self.keepalive = Thread(target=self._keepalive, name='keepalive-thread')
self.thread.start()
def _connect(self):
if self.products is None:
self.products = ["BTC-USD"]
elif not isinstance(self.products, list):
self.products = [self.products]
if self.url[-1] == "/":
self.url = self.url[:-1]
if self.channels is None:
sub_params = {'type': 'subscribe', 'product_ids': self.products}
else:
sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
if self.auth:
timestamp = str(time.time())
message = timestamp + 'GET' + '/users/self/verify'
auth_headers = get_auth_headers(timestamp, message, self.api_key, self.api_secret, self.api_passphrase)
sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
sub_params['key'] = auth_headers['CB-ACCESS-KEY']
sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']
try:
self.ws = create_connection(self.url)
self.ws.send(json.dumps(sub_params))
except Exception as e:
self.on_error(e)
def _keepalive(self, interval=30):
while self.ws and self.ws.connected:
self.ws.ping("keepalive")
time.sleep(interval)
def _listen(self):
self.keepalive.start()
while not self.stop:
try:
data = self.ws.recv()
msg = json.loads(data)
except ValueError as e:
self.on_error(e)
except Exception as e:
self.on_error(e)
else:
self.on_message(msg)
def _disconnect(self):
try:
if self.ws:
self.ws.close()
except WebSocketConnectionClosedException as e:
pass
finally:
if self.keepalive.is_alive():
self.keepalive.join()
self.on_close()
def close(self):
self.stop = True # will only disconnect after next msg recv
self._disconnect() # force disconnect so threads can join
self.thread.join()
def on_open(self):
if self.should_print:
print("-- Subscribed! --\n")
def on_close(self):
if self.should_print:
print("\n-- Socket Closed --")
def on_message(self, msg):
if self.should_print:
print(msg)
if self.mongo_collection: # dump JSON to given mongo collection
self.mongo_collection.insert_one(msg)
def on_error(self, e, data=None):
self.error = e
self.stop = True
print('{} - data: {}'.format(e, data))
if __name__ == "__main__":
import sys
import cbpro
import time
class MyWebsocketClient(cbpro.WebsocketClient):
def on_open(self):
self.url = "wss://ws-feed.pro.coinbase.com/"
self.products = ["BTC-USD", "ETH-USD"]
self.message_count = 0
print("Let's count the messages!")
def on_message(self, msg):
print(json.dumps(msg, indent=4, sort_keys=True))
self.message_count += 1
def on_close(self):
print("-- Goodbye! --")
wsClient = MyWebsocketClient()
wsClient.start()
print(wsClient.url, wsClient.products)
try:
while True:
print("\nMessageCount =", "%i \n" % wsClient.message_count)
time.sleep(1)
except KeyboardInterrupt:
wsClient.close()
if wsClient.error:
sys.exit(1)
else:
sys.exit(0)