python - Is there a standard approach to returning values from coroutine endpoints -
my question:
i know if there "best practice" pattern in python returning values coroutine endpoints (aka "sink" or "consumer"). more generally, how approach following scenario?
my scenario:
i have (producer) > (filter) > (consumer)
coroutine pipeline process text-based table , build list of dictionaries it. object built in consumer
returned original caller of producer
.
my approach:
my approach has been set unique finish-processing signal each coroutine checks for. if hears signal, passes on signal child , yields returned value. consumer
yields current value.
alternative approaches:
i considered:
- using global hold desired object "returned" caller.
- a class-based approach regular subroutines.
reasons why should maybe reconsider these scenario welcome.
my implementation:
here simplified version of have done, key components included.
import uuid finish_processing_signal = uuid.uuid4() def coroutine(func): def start(*args,**kwargs): cr = func(*args,**kwargs) cr.next() return cr return start # sink @coroutine def list_builder(): # accepts objects , adds them list _list = [] try: while true: data = (yield) if data finish_processing_signal: yield _list break _list.append(data) except generatorexit: pass # filter @coroutine def user_data_filter(target=none): if target none: target = list_builder() header = "-+-" footer = "transfer packets" username = "user name" fullname = "full name" note = "description" try: while true: user = {} data = (yield) if data finish_processing_signal: yield target.send(finish_processing_signal) break line = data if header in line: while true: line = (yield) if footer in line: target.send(user) break elif username in line: user["username"] = line.split('|')[1] elif fullname in line: user["fullname"] = line.split('|')[1] elif note in line: user["note"] = line.split('|')[1] except generatorexit: target.close() # producer def process_users_table(table, target=none): if target none: target = user_data_filter() lines = table.split('\r\n') line in lines: target.send(line) processed_data = target.send(finish_processing_signal) return processed_data if __name__ == '__main__': test_users_table = \ """ item |value\r\n ----------------+-----------------------\r\n user name |alice\r\n full name |alice doe\r\n description |\r\n transfer packets|0\r\n ----------------+-----------------------\r\n user name |bob\r\n full name |bob tables\r\n description |\r\n transfer packets|0\r\n """ users = process_users_table(test_users_table) print users
your method of signaling consumer terminate fine , in harmony if using multiprocessing or threaded queue. however, generators have way throw exceptions (rather sending values) , the purpose of throw
precisely signal events or changes in state generator. moreover, when exception thrown generator,
[i]f generator catches exception , yields value, return value of g.throw().
that seems suited use case. instead of sending finish_processing_signal
value, throw finish_processing_signal
exception, , use try..except
yield final value.
class finish_processing_signal(exception): pass def coroutine(func): def start(*args,**kwargs): cr = func(*args,**kwargs) cr.next() return cr return start # sink @coroutine def list_builder(): # accepts objects , adds them list _list = [] try: while true: data = (yield) _list.append(data) except finish_processing_signal: yield _list # filter @coroutine def user_data_filter(target=list_builder()): header = "-+-" footer = "transfer packets" username = "user name" fullname = "full name" note = "description" try: while true: user = {} data = (yield) line = data if header in line: while true: line = (yield) if footer in line: target.send(user) break elif username in line: user["username"] = line.split('|')[1] elif fullname in line: user["fullname"] = line.split('|')[1] elif note in line: user["note"] = line.split('|')[1] except finish_processing_signal err: # pass along exception target, , yield result # caller yield target.throw(err) # producer def process_users_table(table, target=user_data_filter()): lines = table.split('\r\n') line in lines: target.send(line) processed_data = target.throw(finish_processing_signal) # processed_data = target.close() return processed_data if __name__ == '__main__': test_users_table = \ """ item |value\r\n ----------------+-----------------------\r\n user name |alice\r\n full name |alice doe\r\n description |\r\n transfer packets|0\r\n ----------------+-----------------------\r\n user name |bob\r\n full name |bob tables\r\n description |\r\n transfer packets|0\r\n """ users = process_users_table(test_users_table) print users
Comments
Post a Comment