class TestInput(BaseModel):
value: float
class TestOutput(BaseModel):
result: bool
def test_function(input: TestInput) -> TestOutput:
return TestOutput(result=input.value>0.5)
def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
return [TestOutput(result=i.value>0.5) for i in inputs]
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]
# standard
executor = Executor(test_function, batched=False)
res1 = executor(inputs)
assert res1 == expected_outputs
executor = Executor(test_function_batched, batched=True, batch_size=5)
res2 = executor(inputs)
assert res2 == expected_outputsexecutor
executor functions and classes
Executors
Executor classes are helper classes for batching and parallel processing
Executor
Executor (function:Callable, batched:bool, batch_size:int=1)
Basic Executor class. Batches inputs, sends batches to function, unbatches outputs
| Type | Default | Details | |
|---|---|---|---|
| function | typing.Callable | function to be wrapped | |
| batched | bool | if inputs should be batched | |
| batch_size | int | 1 | batch size (set batch_size=0 to pass all inputs) |
ProcessExecutor
ProcessExecutor (function:Callable, batched:bool, batch_size:int=1, concurrency:Optional[int]=1)
ProcessExecutor - executes function with multiprocessing using ProcessPoolExecutor
| Type | Default | Details | |
|---|---|---|---|
| function | typing.Callable | function to be wrapped | |
| batched | bool | if inputs should be batched | |
| batch_size | int | 1 | batch size (set batch_size=0 to pass all inputs) |
| concurrency | typing.Optional[int] | 1 | number of concurrent processes |
class TestInput(BaseModel):
value: float
class TestOutput(BaseModel):
result: bool
def test_function(input: TestInput) -> TestOutput:
return TestOutput(result=input.value>0.5)
def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
return [TestOutput(result=i.value>0.5) for i in inputs]
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]
# process
executor = ProcessExecutor(test_function, batched=False, concurrency=1)
res3 = executor(inputs)
assert res3 == expected_outputs
executor = ProcessExecutor(test_function, batched=False, concurrency=2)
res4 = executor(inputs)
assert res4 == expected_outputs
executor = ProcessExecutor(test_function_batched, batched=True, batch_size=5)
res5 = executor(inputs)
assert res5 == expected_outputs
executor = ProcessExecutor(test_function_batched, batched=True, batch_size=5, concurrency=2)
res6 = executor(inputs)
assert res6 == expected_outputsThreadExecutor
ThreadExecutor (function:Callable, batched:bool, batch_size:int=1, concurrency:Optional[int]=1)
ProcessExecutor - executes function with multiple threads using ThreadPoolExecutor
| Type | Default | Details | |
|---|---|---|---|
| function | typing.Callable | function to be wrapped | |
| batched | bool | if inputs should be batched | |
| batch_size | int | 1 | batch size (set batch_size=0 to pass all inputs) |
| concurrency | typing.Optional[int] | 1 | number of concurrent threads |
class TestInput(BaseModel):
value: float
class TestOutput(BaseModel):
result: bool
def test_function(input: TestInput) -> TestOutput:
return TestOutput(result=input.value>0.5)
def test_function_batched(inputs: list[TestInput]) -> list[TestOutput]:
return [TestOutput(result=i.value>0.5) for i in inputs]
np.random.seed(42)
values = np.random.uniform(size=100).tolist()
inputs = [TestInput(value=i) for i in values]
expected_outputs = [TestOutput(result=i>0.5) for i in values]
# thread
executor = ThreadExecutor(test_function, batched=False, concurrency=1)
res7 = executor(inputs)
assert res7 == expected_outputs
executor = ThreadExecutor(test_function, batched=False, concurrency=2)
res8 = executor(inputs)
assert res8 == expected_outputs
executor = ThreadExecutor(test_function_batched, batched=True, batch_size=5)
res9 = executor(inputs)
assert res9 == expected_outputs
executor = ThreadExecutor(test_function_batched, batched=True, batch_size=5, concurrency=2)
res10 = executor(inputs)
assert res10 == expected_outputsAPIExecutor
APIExecutor (url:str, batch_size:int=1, concurrency:Optional[int]=1)
Basic Executor class. Batches inputs, sends batches to function, unbatches outputs
| Type | Default | Details | |
|---|---|---|---|
| url | str | API URL | |
| batch_size | int | 1 | batch size (set batch_size=0 to pass all inputs) |
| concurrency | typing.Optional[int] | 1 | number of concurrent threads |
# only needed for jupyter
import nest_asyncio
nest_asyncio.apply()
inputs = [{'embedding' : np.random.randn(64).tolist(), 'item':None, 'data':None} for i in range(5)]
executor = APIExecutor('http://localhost:7888/data_source', batch_size=0)
res = executor(inputs)