DatasetExecutor - executes function in parallel using Dataset.map
Type
Default
Details
function
typing.Callable
function to be wrapped
batched
bool
if inputs should be batched
batch_size
int
1
batch size (set batch_size=0 to pass all inputs)
concurrency
typing.Optional[int]
1
number of concurrent threads
map_kwargs
typing.Optional[dict]
None
kwargs for Dataset.map
class TestInput(BaseModel): value: floatclass TestOutput(BaseModel): result: booldef test_function_hf(input: dict) ->dict:return {'result' : input['value']>0.5}def test_function_hf_batched(input: dict) ->dict:return {'result' : [i>0.5for i ininput['value']]}np.random.seed(42)values = np.random.uniform(size=100).tolist()inputs = [TestInput(value=i) for i in values]expected_outputs = [TestOutput(result=i>0.5) for i in values]# datasetexecutor = DatasetExecutor(test_function_hf, batched=False, concurrency=None, batch_size=1)res11 = executor(inputs)assert [TestOutput.model_validate(i) for i in res11] == expected_outputsexecutor = DatasetExecutor(test_function_hf, batched=False, concurrency=2, batch_size=1)res12 = executor(inputs)assert [TestOutput.model_validate(i) for i in res12] == expected_outputsexecutor = DatasetExecutor(test_function_hf_batched, batched=True, concurrency=2, batch_size=5)res13 = executor(inputs)assert [TestOutput.model_validate(i) for i in res13] == expected_outputsexecutor = DatasetExecutor(test_function_hf_batched, batched=True, concurrency=None, batch_size=5)res14 = executor(inputs)assert [TestOutput.model_validate(i) for i in res14] == expected_outputs
Data Plugin
The HugggingfaceDataPlugin uses a Huggingface Dataset with a faiss embedding index as a data source