@@ -33,10 +33,13 @@ class PartitionExecutor:
3333 rows in the queue
3434 """
3535
36- def __init__ (self , batch_snapshot , partition_id , merged_result_set ):
36+ def __init__ (
37+ self , batch_snapshot , partition_id , merged_result_set , lazy_decode = False
38+ ):
3739 self ._batch_snapshot : BatchSnapshot = batch_snapshot
3840 self ._partition_id = partition_id
3941 self ._merged_result_set : MergedResultSet = merged_result_set
42+ self ._lazy_decode = lazy_decode
4043 self ._queue : Queue [PartitionExecutorResult ] = merged_result_set ._queue
4144
4245 def run (self ):
@@ -52,7 +55,9 @@ def run(self):
5255 def __run (self ):
5356 results = None
5457 try :
55- results = self ._batch_snapshot .process_query_batch (self ._partition_id )
58+ results = self ._batch_snapshot .process_query_batch (
59+ self ._partition_id , lazy_decode = self ._lazy_decode
60+ )
5661 for row in results :
5762 if self ._merged_result_set ._metadata is None :
5863 self ._set_metadata (results )
@@ -75,6 +80,7 @@ def _set_metadata(self, results, is_exception=False):
7580 try :
7681 if not is_exception :
7782 self ._merged_result_set ._metadata = results .metadata
83+ self ._merged_result_set ._result_set = results
7884 finally :
7985 self ._merged_result_set .metadata_lock .release ()
8086 self ._merged_result_set .metadata_event .set ()
@@ -94,7 +100,10 @@ class MergedResultSet:
94100 records in the MergedResultSet is not guaranteed.
95101 """
96102
97- def __init__ (self , batch_snapshot , partition_ids , max_parallelism ):
103+ def __init__ (
104+ self , batch_snapshot , partition_ids , max_parallelism , lazy_decode = False
105+ ):
106+ self ._result_set = None
98107 self ._exception = None
99108 self ._metadata = None
100109 self .metadata_event = Event ()
@@ -110,7 +119,7 @@ def __init__(self, batch_snapshot, partition_ids, max_parallelism):
110119 partition_executors = []
111120 for partition_id in partition_ids :
112121 partition_executors .append (
113- PartitionExecutor (batch_snapshot , partition_id , self )
122+ PartitionExecutor (batch_snapshot , partition_id , self , lazy_decode )
114123 )
115124 executor = ThreadPoolExecutor (max_workers = parallelism )
116125 for partition_executor in partition_executors :
@@ -144,3 +153,27 @@ def metadata(self):
144153 def stats (self ):
145154 # TODO: Implement
146155 return None
156+
157+ def decode_row (self , row : []) -> []:
158+ """Decodes a row from protobuf values to Python objects. This function
159+ should only be called for result sets that use ``lazy_decoding=True``.
160+ The array that is returned by this function is the same as the array
161+ that would have been returned by the rows iterator if ``lazy_decoding=False``.
162+
163+ :returns: an array containing the decoded values of all the columns in the given row
164+ """
165+ if self ._result_set is None :
166+ raise ValueError ("iterator not started" )
167+ return self ._result_set .decode_row (row )
168+
169+ def decode_column (self , row : [], column_index : int ):
170+ """Decodes a column from a protobuf value to a Python object. This function
171+ should only be called for result sets that use ``lazy_decoding=True``.
172+ The object that is returned by this function is the same as the object
173+ that would have been returned by the rows iterator if ``lazy_decoding=False``.
174+
175+ :returns: the decoded column value
176+ """
177+ if self ._result_set is None :
178+ raise ValueError ("iterator not started" )
179+ return self ._result_set .decode_column (row , column_index )
0 commit comments