Skip to content

Commit b273abc

Browse files
authored
feat(python-client): add scan filter support (#2305)
Add support for hashkey and sortkey scan filters. In original logic `generate_next_bytes` function has two problem: a. The input buff (i.e. hashkey) can be either ‘str’ or ‘bytearray’. If it's a str, in-place modification like `buff[pos] += 1` won't work since strings are immutable. b. The pos variable was initialized to a fixed index (len(buff) - 1), which is counterintuitive and could lead to an infinite loop. https://github.com/apache/incubator-pegasus/blob/44400f6e3ca1fb1ce48d04d1c9145aad7ac4e991/python-client/pypegasus/pgclient.py#L613-L624
1 parent 02e3fa2 commit b273abc

3 files changed

Lines changed: 81 additions & 17 deletions

File tree

python-client/pypegasus/base/ttypes.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,20 @@ def write(self, oprot):
4444

4545
def validate(self):
4646
return
47+
48+
def raw(self):
49+
if self._is_str:
50+
return self.data.decode('UTF-8')
51+
else:
52+
return self.data
4753

4854
def __init__(self, data=None):
4955
if isinstance(data,str):
50-
data = data.encode('UTF-8')
51-
self.data = data
56+
self._is_str = True
57+
self.data = data.encode('UTF-8')
58+
else:
59+
self._is_str = False
60+
self.data = data
5261

5362
def __hash__(self):
5463
value = 17

python-client/pypegasus/pgclient.py

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
from pypegasus.replication.ttypes import query_cfg_request
3636
from pypegasus.rrdb import *
3737
from pypegasus.rrdb.ttypes import scan_request, get_scanner_request, update_request, key_value, multi_put_request, \
38-
multi_get_request, multi_remove_request
38+
multi_get_request, multi_remove_request, filter_type
3939
from pypegasus.transport.protocol import *
4040
from pypegasus.utils.tools import restore_key, get_ttl, bytes_cmp, ScanOptions
4141

@@ -504,6 +504,10 @@ def start_scan(self):
504504
request.stop_inclusive = self._scan_options.stop_inclusive
505505
request.batch_size = self._scan_options.batch_size
506506
request.need_check_hash = self._check_hash
507+
request.sort_key_filter_type = self._scan_options.sortkey_filter_type
508+
request.sort_key_filter_pattern = blob(self._scan_options.sortkey_filter_pattern)
509+
request.hash_key_filter_type = self._scan_options.hashkey_filter_type
510+
request.hash_key_filter_pattern = blob(self._scan_options.hashkey_filter_pattern)
507511

508512
op = RrdbGetScannerOperator(self._gpid, request, self._partition_hash)
509513
session = self._table.get_session(self._gpid)
@@ -605,6 +609,9 @@ def generate_key(cls, hash_key, sort_key):
605609
hash_key_len = len(hash_key)
606610
sort_key_len = len(sort_key)
607611

612+
if hash_key_len >= 0xFFFF:
613+
raise ValueError("hash_key length must be less than 65535")
614+
608615
if sort_key_len > 0:
609616
values = (hash_key_len, hash_key, sort_key)
610617
s = struct.Struct('>H'+str(hash_key_len)+'s'+str(sort_key_len)+'s')
@@ -619,24 +626,50 @@ def generate_key(cls, hash_key, sort_key):
619626

620627
@classmethod
621628
def generate_next_bytes(cls, buff):
622-
pos = len(buff) - 1
629+
"""
630+
Increment the last non-0xFF byte in the buffer.
631+
632+
If `buff` is a string, it is assumed to be encoded with 'latin-1' to ensure
633+
a 1:1 mapping between characters and bytes. Unicode strings with characters
634+
outside the 0-255 range will raise a UnicodeEncodeError.
635+
"""
636+
is_str = isinstance(buff, str)
637+
is_ba = isinstance(buff, bytearray)
638+
639+
if is_str:
640+
arr = bytearray(buff.encode('latin-1'))
641+
elif is_ba:
642+
arr = buff
643+
else:
644+
arr = bytearray(buff)
645+
pos = len(arr) - 1
623646
found = False
624647
while pos >= 0:
625-
if ord(buff[pos]) != 0xFF:
626-
buff[pos] += 1
648+
if arr[pos] != 0xFF:
649+
arr[pos] += 1
627650
found = True
628651
break
629-
if found:
630-
return buff
652+
pos -= 1
653+
if not found:
654+
arr += b'\x00'
655+
if is_str:
656+
return arr.decode('latin-1')
657+
elif is_ba:
658+
return arr
631659
else:
632-
return buff + chr(0)
660+
return bytes(arr)
633661

662+
@classmethod
663+
def generate_next_key(cls, hash_key, stop_sort_key):
664+
key = cls.generate_key(hash_key, stop_sort_key)
665+
return blob(cls.generate_next_bytes(key.raw()))
666+
634667
@classmethod
635668
def generate_stop_key(cls, hash_key, stop_sort_key):
636669
if stop_sort_key:
637670
return cls.generate_key(hash_key, stop_sort_key), True
638671
else:
639-
return cls.generate_next_bytes(hash_key), False
672+
return blob(cls.generate_next_bytes(hash_key)), False
640673

641674
def __init__(self, meta_addrs=None, table_name='',
642675
timeout=DEFAULT_TIMEOUT):
@@ -1012,6 +1045,24 @@ def get_scanner(self, hash_key,
10121045
stop_key, stop_inclusive = self.generate_stop_key(hash_key, stop_sort_key)
10131046
if not stop_inclusive:
10141047
scan_options.stop_inclusive = stop_inclusive
1048+
1049+
# limit key range by prefix filter
1050+
if scan_options.sortkey_filter_type == filter_type.FT_MATCH_PREFIX and \
1051+
len(scan_options.sortkey_filter_pattern) > 0:
1052+
prefix_start = self.generate_key(hash_key, scan_options.sortkey_filter_pattern)
1053+
# If the prefix start is after the current start_key, move the scan start to the prefix.
1054+
if bytes_cmp(prefix_start.data, start_key.data) > 0:
1055+
start_key = prefix_start
1056+
scan_options.start_inclusive = True
1057+
1058+
prefix_stop = self.generate_next_key(hash_key, scan_options.sortkey_filter_pattern)
1059+
# If the prefix stop is before or equal to the current stop_key, move the scan stop to the prefix stop.
1060+
# The prefix stop represents the next key after hash_key and sortkey_filter_pattern,
1061+
# so stop_inclusive should be False.
1062+
if bytes_cmp(prefix_stop.data, stop_key.data) <= 0:
1063+
stop_key = prefix_stop
1064+
scan_options.stop_inclusive = False
1065+
10151066
gpid_list = []
10161067
hash_list = []
10171068
r = bytes_cmp(start_key.data, stop_key.data)
@@ -1041,10 +1092,6 @@ def get_unordered_scanners(self, max_split_count, scan_options):
10411092
size = count // split
10421093
more = count % split
10431094

1044-
opt = ScanOptions()
1045-
opt.timeout_millis = scan_options.timeout_millis
1046-
opt.batch_size = scan_options.batch_size
1047-
opt.snapshot = scan_options.snapshot
10481095
scanner_list = []
10491096
for i in range(split):
10501097
gpid_list = []
@@ -1056,6 +1103,6 @@ def get_unordered_scanners(self, max_split_count, scan_options):
10561103
gpid_list.append(all_gpid_list[count])
10571104
hash_list.append(int(count))
10581105

1059-
scanner_list.append(PegasusScanner(self.table, gpid_list, opt, hash_list, True))
1106+
scanner_list.append(PegasusScanner(self.table, gpid_list, scan_options, hash_list, True))
10601107

10611108
return scanner_list

python-client/pypegasus/utils/tools.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,11 @@ def __init__(self):
6363
self.start_inclusive = True
6464
self.stop_inclusive = False
6565
self.snapshot = None # for future use
66-
66+
self.sortkey_filter_type = filter_type.FT_NO_FILTER
67+
self.sortkey_filter_pattern = ""
68+
self.hashkey_filter_type = filter_type.FT_NO_FILTER
69+
self.hashkey_filter_pattern = ""
70+
6771
def __repr__(self):
6872
lst = ['%s=%r' % (key, value)
6973
for key, value in self.__dict__.items()]
@@ -104,11 +108,15 @@ def restore_key(merge_key):
104108

105109
return hash_key, sort_key
106110

111+
# This is to ensure compatibility between different byte-like string representations,
112+
# such as 'bytes' and 'str' in various Python versions.
113+
def bval(ch):
114+
return ch if isinstance(ch, int) else ord(ch)
107115

108116
def bytes_cmp(left, right):
109117
min_len = min(len(left), len(right))
110118
for i in range(min_len):
111-
r = ord(left[i]) - ord(right[i])
119+
r = bval(left[i]) - bval(right[i])
112120
if r != 0:
113121
return r
114122

0 commit comments

Comments
 (0)