1+ import pytest
12import taosws
3+ import time
4+ import utils
25from taosws import Consumer
36
47
@@ -55,7 +58,6 @@ def setup():
5558 "alter table `table` drop column new1" ,
5659 ]
5760 for statement in statements :
58- # print(statement)
5961 cursor .execute (statement )
6062
6163
@@ -69,7 +71,6 @@ def test_tmq():
6971 "min.poll.rows" : "129" ,
7072 }
7173 consumer = Consumer (conf )
72-
7374 consumer .subscribe (["ws_tmq_meta" ])
7475
7576 while 1 :
@@ -95,6 +96,127 @@ def test_tmq():
9596 consumer .unsubscribe ()
9697
9798
99+ @pytest .mark .skipif (not utils .TEST_TD_ENTERPRISE , reason = "only for TDengine Enterprise" )
100+ def test_tmq_with_token ():
101+ conn = taosws .connect ("ws://localhost:6041" )
102+ try :
103+ conn .execute ("drop token if exists token_1772607422" )
104+ conn .execute ("drop topic if exists topic_1772607422" )
105+ conn .execute ("drop database if exists test_1772607422" )
106+ conn .execute ("create database test_1772607422" )
107+ conn .execute ("create topic topic_1772607422 as database test_1772607422" )
108+
109+ rs = conn .query (f"create token token_1772607422 from user { utils .test_username ()} " )
110+ token = next (iter (rs ))[0 ]
111+
112+ consumer = Consumer (conf = {
113+ "td.connect.websocket.scheme" : "ws" ,
114+ "td.connect.ip" : "localhost" ,
115+ "td.connect.port" : 6041 ,
116+ "td.connect.user" : "invalid_user" ,
117+ "td.connect.pass" : "invalid_pass" ,
118+ "td.connect.bearer_token" : token ,
119+ "group.id" : "1001" ,
120+ "client.id" : "1001" ,
121+ })
122+ consumer .subscribe (["topic_1772607422" ])
123+ consumer .unsubscribe ()
124+
125+ consumer2 = Consumer (conf = {
126+ "td.connect.websocket.scheme" : "ws" ,
127+ "td.connect.ip" : "localhost" ,
128+ "td.connect.port" : 6041 ,
129+ "td.connect.user" : "invalid_user" ,
130+ "td.connect.pass" : "invalid_pass" ,
131+ "bearer_token" : token ,
132+ "group.id" : "1001" ,
133+ "client.id" : "1001" ,
134+ })
135+ consumer2 .subscribe (["topic_1772607422" ])
136+ consumer2 .unsubscribe ()
137+
138+ consumer3 = Consumer (conf = {
139+ "td.connect.websocket.scheme" : "ws" ,
140+ "td.connect.ip" : "localhost" ,
141+ "td.connect.port" : 6041 ,
142+ "td.connect.user" : "invalid_user" ,
143+ "td.connect.pass" : "invalid_pass" ,
144+ "td.connect.bearer_token" : token ,
145+ "bearer_token" : "invalid_token" ,
146+ "group.id" : "1001" ,
147+ "client.id" : "1001" ,
148+ })
149+ consumer3 .subscribe (["topic_1772607422" ])
150+ consumer3 .unsubscribe ()
151+
152+ consumer4 = Consumer (dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token={ token } " )
153+ consumer4 .subscribe (["topic_1772607422" ])
154+ consumer4 .unsubscribe ()
155+
156+ consumer5 = Consumer (dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&bearer_token={ token } " )
157+ consumer5 .subscribe (["topic_1772607422" ])
158+ consumer5 .unsubscribe ()
159+
160+ consumer6 = Consumer (dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token={ token } &bearer_token=invalid_token" )
161+ consumer6 .subscribe (["topic_1772607422" ])
162+ consumer6 .unsubscribe ()
163+ finally :
164+ time .sleep (3 )
165+ conn .execute ("drop token if exists token_1772607422" )
166+ conn .execute ("drop topic if exists topic_1772607422" )
167+ conn .execute ("drop database if exists test_1772607422" )
168+ conn .close ()
169+
170+
171+ @pytest .mark .skipif (not utils .TEST_TD_ENTERPRISE , reason = "only for TDengine Enterprise" )
172+ def test_tmq_with_invalid_token ():
173+ conn = taosws .connect ("ws://localhost:6041" )
174+ try :
175+ conn .execute ("drop topic if exists topic_1772611547" )
176+ conn .execute ("drop database if exists test_1772611547" )
177+ conn .execute ("create database test_1772611547" )
178+ conn .execute ("create topic topic_1772611547 as database test_1772611547" )
179+
180+ consumer = Consumer (conf = {
181+ "td.connect.websocket.scheme" : "ws" ,
182+ "td.connect.ip" : "localhost" ,
183+ "td.connect.port" : 6041 ,
184+ "td.connect.user" : "invalid_user" ,
185+ "td.connect.pass" : "invalid_pass" ,
186+ "td.connect.bearer_token" : "invalid_token" ,
187+ "group.id" : "1001" ,
188+ "client.id" : "1001" ,
189+ })
190+ with pytest .raises (Exception , match = r"init tscObj with token failed" ):
191+ consumer .subscribe (["topic_1772611547" ])
192+
193+ consumer2 = Consumer (conf = {
194+ "td.connect.websocket.scheme" : "ws" ,
195+ "td.connect.ip" : "localhost" ,
196+ "td.connect.port" : 6041 ,
197+ "td.connect.user" : "invalid_user" ,
198+ "td.connect.pass" : "invalid_pass" ,
199+ "bearer_token" : "invalid_token" ,
200+ "group.id" : "1001" ,
201+ "client.id" : "1001" ,
202+ })
203+ with pytest .raises (Exception , match = r"init tscObj with token failed" ):
204+ consumer2 .subscribe (["topic_1772611547" ])
205+
206+ consumer3 = Consumer (dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&td.connect.bearer_token=invalid_token" )
207+ with pytest .raises (Exception , match = r"init tscObj with token failed" ):
208+ consumer3 .subscribe (["topic_1772611547" ])
209+
210+ consumer4 = Consumer (dsn = f"ws://invalid_user:invalid_pass@localhost:6041?group.id=1001&client.id=1001&bearer_token=invalid_token" )
211+ with pytest .raises (Exception , match = r"init tscObj with token failed" ):
212+ consumer4 .subscribe (["topic_1772611547" ])
213+ finally :
214+ time .sleep (3 )
215+ conn .execute ("drop topic if exists topic_1772611547" )
216+ conn .execute ("drop database if exists test_1772611547" )
217+ conn .close ()
218+
219+
98220def show_env ():
99221 import os
100222
0 commit comments