From 03deced63f4c8a3466f5b8c97c691e3650cf315c Mon Sep 17 00:00:00 2001 From: Vivek Panyam Date: Wed, 13 Jan 2021 19:00:46 -0500 Subject: [PATCH] [Python] Add initial support for asyncio --- source/neuropod/bindings/neuropod_native.cc | 25 +++++ source/python/neuropod/loader.py | 23 +++++ .../python/neuropod/tests/test_async_infer.py | 94 +++++++++++++++++++ 3 files changed, 142 insertions(+) create mode 100644 source/python/neuropod/tests/test_async_infer.py diff --git a/source/neuropod/bindings/neuropod_native.cc b/source/neuropod/bindings/neuropod_native.cc index ccbce8e2..59693834 100644 --- a/source/neuropod/bindings/neuropod_native.cc +++ b/source/neuropod/bindings/neuropod_native.cc @@ -61,6 +61,30 @@ py::dict infer(Neuropod &neuropod, py::dict &inputs_dict) return to_numpy_dict(*outputs); } +py::object infer_async(Neuropod &neuropod, py::dict &inputs_dict) +{ + // Convert from a py::dict of numpy arrays to an unordered_map of `NeuropodTensor`s + auto allocator = neuropod.get_tensor_allocator(); + NeuropodValueMap inputs = from_numpy_dict(*allocator, inputs_dict); + + // Run inference + auto outputs = neuropod.infer(inputs); + + // Convert the outputs to a python dict of numpy arrays + auto result = to_numpy_dict(*outputs); + + // Get the event loop and create a future + // (note can't use get_running_loop because this isn't a coroutine) + py::object loop = py::module::import("asyncio").attr("get_event_loop")(); + py::object f = loop.attr("create_future")(); + + // Set the result + // This can be called from another thread (still need to keep track of the GIL) + loop.attr("call_soon_threadsafe")(f.attr("set_result"), result); + + return f; +} + py::array deserialize_tensor_binding(py::bytes buffer) { // Deserialize to a NeuropodTensor @@ -157,6 +181,7 @@ PYBIND11_MODULE(neuropod_native, m) const std::vector &default_backend_overrides, py::kwargs kwargs) { return make_neuropod(kwargs, path, default_backend_overrides); })) .def("infer", &infer) + .def("infer_async", &infer_async) .def("get_inputs", &Neuropod::get_inputs) .def("get_outputs", &Neuropod::get_outputs) .def("get_name", &Neuropod::get_name) diff --git a/source/python/neuropod/loader.py b/source/python/neuropod/loader.py index b7397d74..c0314a55 100644 --- a/source/python/neuropod/loader.py +++ b/source/python/neuropod/loader.py @@ -130,6 +130,29 @@ def infer(self, inputs): inputs = maybe_convert_bindings_types(inputs) return self.model.infer(inputs) + def infer_async(self, inputs): + """ + Run inference in an asyncio compatible way. + + Callers should ensure they limit the number of in-flight requests to avoid memory + growth (e.g. by using asyncio.Semaphore) + + Note: this method currently blocks the current python thread. This will change + in the future + + :param inputs: A dict mapping input names to values. This must match the input + spec in the neuropod config for the loaded model. + Ex: {'x1': np.array([5]), 'x2': np.array([6])} + *Note:* all the keys in this dict must be strings and all the + values must be numpy arrays + + :returns: asyncio.Future containing the same result type as `infer` above + """ + inputs = maybe_convert_bindings_types(inputs) + + # Pass the inputs to the native code and return a future + return self.model.infer_async(inputs) + def __enter__(self): # Needed in order to be used as a contextmanager return self diff --git a/source/python/neuropod/tests/test_async_infer.py b/source/python/neuropod/tests/test_async_infer.py new file mode 100644 index 00000000..da44b09f --- /dev/null +++ b/source/python/neuropod/tests/test_async_infer.py @@ -0,0 +1,94 @@ +# Copyright (c) 2021 UATC, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import six +import numpy as np +import unittest +from testpath.tempdir import TemporaryDirectory + +from neuropod.loader import load_neuropod +from neuropod.packagers import create_python_neuropod +from neuropod.tests.utils import get_addition_model_spec + +ADDITION_MODEL_SOURCE = """ +import sys + +def addition_model(x, y): + return { + "out": x + y + } + +def get_model(_): + return addition_model +""" + + +@unittest.skipIf(six.PY2, "Skipping asyncio test for Python 2") +class TestAsync(unittest.TestCase): + def package_simple_addition_model(self, test_dir, do_fail=False): + neuropod_path = os.path.join(test_dir, "test_neuropod") + model_code_dir = os.path.join(test_dir, "model_code") + os.makedirs(model_code_dir) + + with open(os.path.join(model_code_dir, "addition_model.py"), "w") as f: + f.write(ADDITION_MODEL_SOURCE) + + # `create_python_neuropod` runs inference with the test data immediately + # after creating the neuropod. Raises a ValueError if the model output + # does not match the expected output. + create_python_neuropod( + neuropod_path=neuropod_path, + model_name="addition_model", + data_paths=[], + code_path_spec=[ + { + "python_root": model_code_dir, + "dirs_to_package": [""], # Package everything in the python_root + } + ], + entrypoint_package="addition_model", + entrypoint="get_model", + # Get the input/output spec along with test data + **get_addition_model_spec(do_fail=do_fail) + ) + + return neuropod_path + + def test_async_inference(self): + # Get an event loop + import asyncio + + loop = asyncio.get_event_loop() + + with TemporaryDirectory() as test_dir: + # Package a model + path = self.package_simple_addition_model(test_dir) + + # Sample input data + input_data = { + "x": np.array([0.5], dtype=np.float32), + "y": np.array([1.5], dtype=np.float32), + } + + with load_neuropod(path) as model: + # Async infer + result = loop.run_until_complete(model.infer_async(input_data)) + + # Ensure the output is what we expect + self.assertEqual(result["out"][0], 2.0) + + +if __name__ == "__main__": + unittest.main()