executor

executor functions and classes

Executors

Executor classes are helper classes for batching and parallel processing


source

Executor

 Executor (function:Callable, batched:bool, batch_size:int=1)

Basic Executor class. Batches inputs, sends batches to function, unbatches outputs

Type Default Details
function typing.Callable function to be wrapped
batched bool if inputs should be batched
batch_size int 1 batch size (set batch_size=0 to pass all inputs)
class TestInput(BaseModel):
    value: float
        
class TestOutput(BaseModel):
    result: bool
        
def test_function(input: TestInput) -> TestOutput:
    return TestOutput(result=input.value>0.5)

def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
    return [TestOutput(result=i.value>0.5) for i in inputs]
        
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]

# standard

executor = Executor(test_function, batched=False)
res1 = executor(inputs)
assert res1 == expected_outputs

executor = Executor(test_function_batched, batched=True, batch_size=5)
res2 = executor(inputs)
assert res2 == expected_outputs

source

ProcessExecutor

 ProcessExecutor (function:Callable, batched:bool, batch_size:int=1,
                  concurrency:Optional[int]=1)

ProcessExecutor - executes function with multiprocessing using ProcessPoolExecutor

Type Default Details
function typing.Callable function to be wrapped
batched bool if inputs should be batched
batch_size int 1 batch size (set batch_size=0 to pass all inputs)
concurrency typing.Optional[int] 1 number of concurrent processes
class TestInput(BaseModel):
    value: float
        
class TestOutput(BaseModel):
    result: bool
        
def test_function(input: TestInput) -> TestOutput:
    return TestOutput(result=input.value>0.5)

def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
    return [TestOutput(result=i.value>0.5) for i in inputs]
        
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]

# process

executor = ProcessExecutor(test_function, batched=False, concurrency=1)
res3 = executor(inputs)
assert res3 == expected_outputs

executor = ProcessExecutor(test_function, batched=False, concurrency=2)
res4 = executor(inputs)
assert res4 == expected_outputs

executor = ProcessExecutor(test_function_batched, batched=True, batch_size=5)
res5 = executor(inputs)
assert res5 == expected_outputs

executor = ProcessExecutor(test_function_batched, batched=True, batch_size=5, concurrency=2)
res6 = executor(inputs)
assert res6 == expected_outputs

source

ThreadExecutor

 ThreadExecutor (function:Callable, batched:bool, batch_size:int=1,
                 concurrency:Optional[int]=1)

ProcessExecutor - executes function with multiple threads using ThreadPoolExecutor

Type Default Details
function typing.Callable function to be wrapped
batched bool if inputs should be batched
batch_size int 1 batch size (set batch_size=0 to pass all inputs)
concurrency typing.Optional[int] 1 number of concurrent threads
class TestInput(BaseModel):
    value: float
        
class TestOutput(BaseModel):
    result: bool
        
def test_function(input: TestInput) -> TestOutput:
    return TestOutput(result=input.value>0.5)

def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
    return [TestOutput(result=i.value>0.5) for i in inputs]
        
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]

# thread

executor = ThreadExecutor(test_function, batched=False, concurrency=1)
res7 = executor(inputs)
assert res7 == expected_outputs

executor = ThreadExecutor(test_function, batched=False, concurrency=2)
res8 = executor(inputs)
assert res8 == expected_outputs

executor = ThreadExecutor(test_function_batched, batched=True, batch_size=5)
res9 = executor(inputs)
assert res9 == expected_outputs

executor = ThreadExecutor(test_function_batched, batched=True, batch_size=5, concurrency=2)
res10 = executor(inputs)
assert res10 == expected_outputs

source

APIExecutor

 APIExecutor (url:str, batch_size:int=1, concurrency:Optional[int]=1)

Basic Executor class. Batches inputs, sends batches to function, unbatches outputs

Type Default Details
url str API URL
batch_size int 1 batch size (set batch_size=0 to pass all inputs)
concurrency typing.Optional[int] 1 number of concurrent threads
# only needed for jupyter
import nest_asyncio
nest_asyncio.apply()

inputs = [{'embedding' : np.random.randn(64).tolist(), 'item':None, 'data':None} for i in range(5)]
executor = APIExecutor('http://localhost:7888/data_source', batch_size=0)
res = executor(inputs)