-
Notifications
You must be signed in to change notification settings - Fork 17
Implement remote evaluation and distributed garbage collection #46
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| #include <string> | ||
| #include <thread> | ||
| #include <tuple> | ||
| #include <variant> | ||
|
|
||
| #include "python.hpp" | ||
|
|
||
|
|
@@ -138,15 +139,14 @@ auto ElixirPythonxJanitor = fine::Atom("Elixir.Pythonx.Janitor"); | |
| auto ElixirPythonxObject = fine::Atom("Elixir.Pythonx.Object"); | ||
| auto decref = fine::Atom("decref"); | ||
| auto integer = fine::Atom("integer"); | ||
| auto lines = fine::Atom("lines"); | ||
| auto list = fine::Atom("list"); | ||
| auto map = fine::Atom("map"); | ||
| auto map_set = fine::Atom("map_set"); | ||
| auto output = fine::Atom("output"); | ||
| auto remote_info = fine::Atom("remote_info"); | ||
| auto resource = fine::Atom("resource"); | ||
| auto traceback = fine::Atom("traceback"); | ||
| auto tuple = fine::Atom("tuple"); | ||
| auto type = fine::Atom("type"); | ||
| auto value = fine::Atom("value"); | ||
| } // namespace atoms | ||
|
|
||
| struct PyObjectResource { | ||
|
|
@@ -186,8 +186,26 @@ struct PyObjectResource { | |
|
|
||
| FINE_RESOURCE(PyObjectResource); | ||
|
|
||
| // A resource that notifies the given process upon garbage collection. | ||
| struct GCNotifier { | ||
| ErlNifPid pid; | ||
| ErlNifEnv *message_env; | ||
| ERL_NIF_TERM message_term; | ||
|
|
||
| GCNotifier(ErlNifPid pid, ErlNifEnv *message_env, ERL_NIF_TERM message_term) | ||
| : pid(pid), message_env(message_env), message_term(message_term) {} | ||
|
|
||
| void destructor(ErlNifEnv *env) { | ||
| enif_send(env, &pid, message_env, message_term); | ||
| enif_free_env(message_env); | ||
| } | ||
| }; | ||
|
|
||
| FINE_RESOURCE(GCNotifier); | ||
|
|
||
| struct ExObject { | ||
| fine::ResourcePtr<PyObjectResource> resource; | ||
| std::optional<fine::Term> remote_info; | ||
|
|
||
| ExObject() {} | ||
| ExObject(fine::ResourcePtr<PyObjectResource> resource) : resource(resource) {} | ||
|
|
@@ -196,26 +214,21 @@ struct ExObject { | |
|
|
||
| static constexpr auto fields() { | ||
| return std::make_tuple( | ||
| std::make_tuple(&ExObject::resource, &atoms::resource)); | ||
| std::make_tuple(&ExObject::resource, &atoms::resource), | ||
| std::make_tuple(&ExObject::remote_info, &atoms::remote_info)); | ||
| } | ||
| }; | ||
|
|
||
| struct ExError { | ||
| ExObject type; | ||
| ExObject value; | ||
| ExObject traceback; | ||
| std::vector<fine::Term> lines; | ||
|
|
||
| ExError() {} | ||
| ExError(ExObject type, ExObject value, ExObject traceback) | ||
| : type(type), value(value), traceback(traceback) {} | ||
| ExError(std::vector<fine::Term> lines) : lines(lines) {} | ||
|
|
||
| static constexpr auto module = &atoms::ElixirPythonxError; | ||
|
|
||
| static constexpr auto fields() { | ||
| return std::make_tuple( | ||
| std::make_tuple(&ExError::type, &atoms::type), | ||
| std::make_tuple(&ExError::value, &atoms::value), | ||
| std::make_tuple(&ExError::traceback, &atoms::traceback)); | ||
| return std::make_tuple(std::make_tuple(&ExError::lines, &atoms::lines)); | ||
| } | ||
|
|
||
| static constexpr auto is_exception = true; | ||
|
|
@@ -228,30 +241,91 @@ struct EvalInfo { | |
| std::thread::id thread_id; | ||
| }; | ||
|
|
||
| void raise_py_error(ErlNifEnv *env) { | ||
| void raise_formatting_error_if_failed(PyObjectPtr py_object) { | ||
| if (py_object == NULL) { | ||
| throw std::runtime_error("failed while formatting a python error"); | ||
| } | ||
| } | ||
|
|
||
| void raise_formatting_error_if_failed(const char *buffer) { | ||
| if (buffer == NULL) { | ||
| throw std::runtime_error("failed while formatting a python error"); | ||
| } | ||
| } | ||
|
|
||
| void raise_formatting_error_if_failed(Py_ssize_t size) { | ||
| if (size == -1) { | ||
| throw std::runtime_error("failed while formatting a python error"); | ||
| } | ||
| } | ||
|
|
||
| ExError build_py_error_from_current(ErlNifEnv *env) { | ||
| PyObjectPtr py_type, py_value, py_traceback; | ||
| PyErr_Fetch(&py_type, &py_value, &py_traceback); | ||
|
|
||
| // If the error indicator was set, type should not be NULL, but value | ||
| // and traceback might | ||
|
|
||
| // and traceback might. | ||
| if (py_type == NULL) { | ||
| throw std::runtime_error( | ||
| "raise_py_error should only be called when the error indicator is set"); | ||
| throw std::runtime_error("build_py_error_from_current should only be " | ||
| "called when the error indicator is set"); | ||
| } | ||
|
|
||
| auto type = ExObject(fine::make_resource<PyObjectResource>(py_type)); | ||
|
|
||
| // Default value and traceback to None object | ||
| // Default value and traceback to None object. | ||
| py_value = py_value == NULL ? Py_BuildValue("") : py_value; | ||
| py_traceback = py_traceback == NULL ? Py_BuildValue("") : py_traceback; | ||
|
|
||
| // Format the exception. Note that if anything raises an error here, | ||
| // we throw a runtime exception, instead of a Python one, otherwise | ||
| // we could go into an infinite loop. | ||
|
|
||
| auto py_traceback_module = PyImport_ImportModule("traceback"); | ||
| raise_formatting_error_if_failed(py_traceback_module); | ||
| auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module); | ||
|
|
||
| auto value = fine::make_resource<PyObjectResource>( | ||
| py_value == NULL ? Py_BuildValue("") : py_value); | ||
| auto format_exception = | ||
| PyObject_GetAttrString(py_traceback_module, "format_exception"); | ||
| raise_formatting_error_if_failed(format_exception); | ||
| auto format_exception_guard = PyDecRefGuard(format_exception); | ||
|
|
||
| auto traceback = fine::make_resource<PyObjectResource>( | ||
| py_traceback == NULL ? Py_BuildValue("") : py_traceback); | ||
| auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback); | ||
| raise_formatting_error_if_failed(format_exception_args); | ||
| auto format_exception_args_guard = PyDecRefGuard(format_exception_args); | ||
|
|
||
| auto error = ExError(type, value, traceback); | ||
| fine::raise(env, error); | ||
| auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL); | ||
| raise_formatting_error_if_failed(py_lines); | ||
| auto py_lines_guard = PyDecRefGuard(py_lines); | ||
|
|
||
| auto size = PyList_Size(py_lines); | ||
| raise_formatting_error_if_failed(size); | ||
|
|
||
| auto terms = std::vector<fine::Term>(); | ||
| terms.reserve(size); | ||
|
|
||
| for (Py_ssize_t i = 0; i < size; i++) { | ||
| auto py_line = PyList_GetItem(py_lines, i); | ||
| raise_formatting_error_if_failed(py_line); | ||
|
|
||
| Py_ssize_t size; | ||
| auto buffer = PyUnicode_AsUTF8AndSize(py_line, &size); | ||
| raise_formatting_error_if_failed(buffer); | ||
|
|
||
| // The buffer is immutable and lives as long as the Python object, | ||
| // so we create the term as a resource binary to make it zero-copy. | ||
| Py_IncRef(py_line); | ||
| auto ex_object_resource = fine::make_resource<PyObjectResource>(py_line); | ||
| auto binary_term = | ||
| fine::make_resource_binary(env, ex_object_resource, buffer, size); | ||
|
|
||
| terms.push_back(binary_term); | ||
| } | ||
|
|
||
| return ExError(std::move(terms)); | ||
| } | ||
|
|
||
| void raise_py_error(ErlNifEnv *env) { | ||
| fine::raise(env, build_py_error_from_current(env)); | ||
| } | ||
|
|
||
| void raise_if_failed(ErlNifEnv *env, PyObjectPtr py_object) { | ||
|
|
@@ -284,6 +358,19 @@ ERL_NIF_TERM py_str_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) { | |
| return fine::make_resource_binary(env, ex_object_resource, buffer, size); | ||
| } | ||
|
|
||
| ERL_NIF_TERM py_bytes_to_binary_term(ErlNifEnv *env, PyObjectPtr py_object) { | ||
| Py_ssize_t size; | ||
| char *buffer; | ||
| auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size); | ||
| raise_if_failed(env, result); | ||
|
|
||
| // The buffer is immutable and lives as long as the Python object, | ||
| // so we create the term as a resource binary to make it zero-copy. | ||
| Py_IncRef(py_object); | ||
| auto ex_object_resource = fine::make_resource<PyObjectResource>(py_object); | ||
| return fine::make_resource_binary(env, ex_object_resource, buffer, size); | ||
| } | ||
|
|
||
| fine::Ok<> init(ErlNifEnv *env, std::string python_dl_path, | ||
| ErlNifBinary python_home_path, | ||
| ErlNifBinary python_executable_path, | ||
|
|
@@ -785,50 +872,6 @@ ExObject object_repr(ErlNifEnv *env, ExObject ex_object) { | |
|
|
||
| FINE_NIF(object_repr, ERL_NIF_DIRTY_JOB_CPU_BOUND); | ||
|
|
||
| fine::Term format_exception(ErlNifEnv *env, ExError error) { | ||
| ensure_initialized(); | ||
| auto gil_guard = PyGILGuard(); | ||
|
|
||
| auto py_traceback_module = PyImport_ImportModule("traceback"); | ||
| raise_if_failed(env, py_traceback_module); | ||
| auto py_traceback_module_guard = PyDecRefGuard(py_traceback_module); | ||
|
|
||
| auto format_exception = | ||
| PyObject_GetAttrString(py_traceback_module, "format_exception"); | ||
| raise_if_failed(env, format_exception); | ||
| auto format_exception_guard = PyDecRefGuard(format_exception); | ||
|
|
||
| auto py_type = error.type.resource->py_object; | ||
| auto py_value = error.value.resource->py_object; | ||
| auto py_traceback = error.traceback.resource->py_object; | ||
|
|
||
| auto format_exception_args = PyTuple_Pack(3, py_type, py_value, py_traceback); | ||
| raise_if_failed(env, format_exception_args); | ||
| auto format_exception_args_guard = PyDecRefGuard(format_exception_args); | ||
|
|
||
| auto py_lines = PyObject_Call(format_exception, format_exception_args, NULL); | ||
| raise_if_failed(env, py_lines); | ||
| auto py_lines_guard = PyDecRefGuard(py_lines); | ||
|
|
||
| auto size = PyList_Size(py_lines); | ||
| raise_if_failed(env, size); | ||
|
|
||
| auto terms = std::vector<ERL_NIF_TERM>(); | ||
| terms.reserve(size); | ||
|
|
||
| for (Py_ssize_t i = 0; i < size; i++) { | ||
| auto py_line = PyList_GetItem(py_lines, i); | ||
| raise_if_failed(env, py_line); | ||
|
|
||
| terms.push_back(py_str_to_binary_term(env, py_line)); | ||
| } | ||
|
|
||
| return enif_make_list_from_array(env, terms.data(), | ||
| static_cast<unsigned int>(size)); | ||
| } | ||
|
|
||
| FINE_NIF(format_exception, ERL_NIF_DIRTY_JOB_CPU_BOUND); | ||
|
|
||
| fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) { | ||
| ensure_initialized(); | ||
| auto gil_guard = PyGILGuard(); | ||
|
|
@@ -987,16 +1030,7 @@ fine::Term decode_once(ErlNifEnv *env, ExObject ex_object) { | |
| auto is_bytes = PyObject_IsInstance(py_object, py_bytes_type); | ||
| raise_if_failed(env, is_bytes); | ||
| if (is_bytes) { | ||
| Py_ssize_t size; | ||
| char *buffer; | ||
| auto result = PyBytes_AsStringAndSize(py_object, &buffer, &size); | ||
| raise_if_failed(env, result); | ||
|
|
||
| // The buffer is immutable and lives as long as the Python object, | ||
| // so we create the term as a resource binary to make it zero-copy. | ||
| Py_IncRef(py_object); | ||
| auto ex_object_resource = fine::make_resource<PyObjectResource>(py_object); | ||
| return fine::make_resource_binary(env, ex_object_resource, buffer, size); | ||
| return py_bytes_to_binary_term(env, py_object); | ||
| } | ||
|
|
||
| auto py_set_type = PyDict_GetItemString(py_builtins, "set"); | ||
|
|
@@ -1461,6 +1495,86 @@ eval(ErlNifEnv *env, ErlNifBinary code, std::string code_md5, | |
|
|
||
| FINE_NIF(eval, ERL_NIF_DIRTY_JOB_CPU_BOUND); | ||
|
|
||
| std::variant<fine::Ok<fine::Term>, fine::Error<std::string, ExError>> | ||
| dump_object(ErlNifEnv *env, ExObject ex_object) { | ||
| ensure_initialized(); | ||
| auto gil_guard = PyGILGuard(); | ||
|
|
||
| std::string pickle_module_name; | ||
| PyObjectPtr py_pickle; | ||
|
|
||
| py_pickle = PyImport_ImportModule("cloudpickle"); | ||
| if (py_pickle != NULL) { | ||
| pickle_module_name = "cloudpickle"; | ||
| } else { | ||
| // If importing fails, we ignore the error and fallback to the pickle | ||
| // module. | ||
| PyErr_Clear(); | ||
| py_pickle = PyImport_ImportModule("pickle"); | ||
| raise_if_failed(env, py_pickle); | ||
| pickle_module_name = "pickle"; | ||
| } | ||
| auto py_pickle_guard = PyDecRefGuard(py_pickle); | ||
|
|
||
| auto py_dumps = PyObject_GetAttrString(py_pickle, "dumps"); | ||
| raise_if_failed(env, py_dumps); | ||
| auto py_dumps_guard = PyDecRefGuard(py_dumps); | ||
|
|
||
| auto py_dumps_args = PyTuple_Pack(1, ex_object.resource->py_object); | ||
| raise_if_failed(env, py_dumps_args); | ||
| auto py_dumps_args_guard = PyDecRefGuard(py_dumps_args); | ||
|
|
||
| auto py_dump_bytes = PyObject_Call(py_dumps, py_dumps_args, NULL); | ||
| if (py_dump_bytes == NULL) { | ||
| return fine::Error<std::string, ExError>(pickle_module_name, | ||
| build_py_error_from_current(env)); | ||
| } | ||
| raise_if_failed(env, py_dump_bytes); | ||
| auto py_bytes_guard = PyDecRefGuard(py_dump_bytes); | ||
|
|
||
| return fine::Ok<fine::Term>(py_bytes_to_binary_term(env, py_dump_bytes)); | ||
| } | ||
|
|
||
| FINE_NIF(dump_object, ERL_NIF_DIRTY_JOB_CPU_BOUND); | ||
|
|
||
| ExObject load_object(ErlNifEnv *env, ErlNifBinary binary) { | ||
| ensure_initialized(); | ||
| auto gil_guard = PyGILGuard(); | ||
|
|
||
| auto py_pickle = PyImport_ImportModule("pickle"); | ||
| raise_if_failed(env, py_pickle); | ||
| auto py_pickle_guard = PyDecRefGuard(py_pickle); | ||
|
|
||
| auto py_loads = PyObject_GetAttrString(py_pickle, "loads"); | ||
| raise_if_failed(env, py_loads); | ||
| auto py_loads_guard = PyDecRefGuard(py_loads); | ||
|
|
||
| auto py_bytes = PyBytes_FromStringAndSize( | ||
| reinterpret_cast<const char *>(binary.data), binary.size); | ||
| raise_if_failed(env, py_bytes); | ||
| auto py_bytes_guard = PyDecRefGuard(py_bytes); | ||
|
|
||
| auto py_loads_args = PyTuple_Pack(1, py_bytes); | ||
| raise_if_failed(env, py_loads_args); | ||
| auto py_loads_args_guard = PyDecRefGuard(py_loads_args); | ||
|
|
||
| auto py_object = PyObject_Call(py_loads, py_loads_args, NULL); | ||
| raise_if_failed(env, py_object); | ||
|
|
||
| return ExObject(fine::make_resource<PyObjectResource>(py_object)); | ||
| } | ||
|
|
||
| FINE_NIF(load_object, ERL_NIF_DIRTY_JOB_CPU_BOUND); | ||
|
|
||
| fine::ResourcePtr<GCNotifier> create_gc_notifier(ErlNifEnv *env, ErlNifPid pid, | ||
| fine::Term term) { | ||
| auto message_env = enif_alloc_env(); | ||
| auto message_term = enif_make_copy(message_env, term); | ||
| return fine::make_resource<GCNotifier>(pid, message_env, message_term); | ||
| } | ||
|
|
||
| FINE_NIF(create_gc_notifier, 0); | ||
|
|
||
| } // namespace pythonx | ||
|
|
||
| FINE_INIT("Elixir.Pythonx.NIF"); | ||
|
|
@@ -1505,6 +1619,9 @@ extern "C" void pythonx_handle_io_write(const char *message, | |
| ErlNifPid janitor_pid; | ||
| if (enif_whereis_pid(caller_env, janitor_name, &janitor_pid)) { | ||
| auto device = type == 0 ? eval_info.stdout_device : eval_info.stderr_device; | ||
| // Copy the device term is from a differnet env, so we copy it into | ||
| // the message env, otherwise we may run into unexpected behaviour. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Which sort of unexpected behaviour?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So far it has been working fine. Once I was testing remote eval, where |
||
| device = enif_make_copy(env, device); | ||
|
|
||
| auto msg = fine::encode(env, std::make_tuple(pythonx::atoms::output, | ||
| std::string(message), device)); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.