From 4b98e710b9c45481c1cdc7a4ee68f7ce7fca908a Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 13 Jul 2015 15:38:41 +0200 Subject: erts: Add support for asynchronous open_port OTP-13086 --- erts/emulator/beam/io.c | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index fdd26fcc4b..409df846e9 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -84,6 +84,7 @@ static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); static void pdl_init(void); +static int driver_failure_term(ErlDrvPort ix, Eterm term, int eof); #ifdef ERTS_SMP static void driver_monitor_lock_pdl(Port *p); static void driver_monitor_unlock_pdl(Port *p); @@ -383,6 +384,7 @@ static Port *create_port(char *name, ERTS_PTMR_INIT(prt); erts_port_task_handle_init(&prt->timeout_task); prt->psd = NULL; + prt->async_open_port = NULL; prt->drv_data = (SWord) 0; prt->os_pid = -1; @@ -2732,6 +2734,61 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp) port_sig_link); } +static void +init_ack_send_reply(Port *port, Eterm resp) +{ + + if (!is_internal_port(resp)) { + Process *rp = erts_proc_lookup_raw(port->async_open_port->to); + erts_smp_proc_lock(rp, ERTS_PROC_LOCK_LINK); + erts_remove_link(&ERTS_P_LINKS(port), port->async_open_port->to); + erts_remove_link(&ERTS_P_LINKS(rp), port->common.id); + erts_smp_proc_unlock(rp, ERTS_PROC_LOCK_LINK); + } + port_sched_op_reply(port->async_open_port->to, + port->async_open_port->ref, + resp); + + erts_free(ERTS_ALC_T_PRTSD, port->async_open_port); + port->async_open_port = NULL; +} + +void +erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) { + Port *port = erts_drvport2port(ix); + SWord err_type = (SWord)res; + Eterm resp; + + if (port == ERTS_INVALID_ERL_DRV_PORT && port->async_open_port) + return; + + if (port->async_open_port) { + switch(err_type) { + case -3: + resp = am_badarg; + break; + case -2: { + char *str = erl_errno_id(errno); + resp = erts_atom_put((byte *) str, strlen(str), + ERTS_ATOM_ENC_LATIN1, 1); + break; + } + case -1: + resp = am_einval; + break; + default: + resp = port->common.id; + break; + } + + init_ack_send_reply(port, resp); + + if (err_type == -1 || err_type == -2 || err_type == -3) + driver_failure_term(ix, am_normal, 0); + port->drv_data = err_type; + } +} + void erts_init_io(int port_tab_size, int port_tab_size_ignore_files, int legacy_port_tab) @@ -6972,6 +7029,9 @@ driver_failure_term(ErlDrvPort ix, Eterm term, int eof) if (prt == ERTS_INVALID_ERL_DRV_PORT) return -1; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + + if (prt->async_open_port) + init_ack_send_reply(prt, prt->common.id); if (eof) flush_linebuf_messages(prt, state); if (state & ERTS_PORT_SFLG_CLOSING) { -- cgit v1.2.3 From 91c1876f70870f450f7e8fd3b02f2396067e3cb8 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Tue, 14 Jul 2015 11:07:33 +0200 Subject: erts: Add erl_drv_set_pid OTP-13087 --- erts/emulator/beam/io.c | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 409df846e9..2a3759212e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -2789,6 +2789,17 @@ erl_drv_init_ack(ErlDrvPort ix, ErlDrvData res) { } } +void +erl_drv_set_os_pid(ErlDrvPort ix, ErlDrvSInt pid) { + Port *port = erts_drvport2port(ix); + + if (port == ERTS_INVALID_ERL_DRV_PORT) + return; + + port->os_pid = (SWord)pid; + +} + void erts_init_io(int port_tab_size, int port_tab_size_ignore_files, int legacy_port_tab) -- cgit v1.2.3 From 14c7fefd51be035a44bfe42127fb4b9df92d760b Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 6 Jul 2015 17:13:52 +0200 Subject: erts: Create forker process for spawn driver Instead of forking from the beam process, we create a separate process in which all forks are done. This has several advantages: 1) performance: * don't have to close all fd's in the world * fork only has to copy stuff from a small process * work is done in a completely seperate process * a 3x performance increase has been measured, can be made even greater (10x) if we cache the environment in child setup 2) stability * the exec is done in another process than beam, which means that if the file that we exec to is on an nfs that is not available right now we will not block a scheduler until the nfs returns. 3) simplicity * don't have to deal with SIGCHLD in the erts Unfortunately, this solution also implies some badness. 1) There will always be a seperate process running together with beam on unix. This could be confusing and undesirable. 2) We have to transfer the entire environment to child_setup for each command. OTP-13088 --- erts/emulator/beam/io.c | 67 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 59 insertions(+), 8 deletions(-) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 2a3759212e..6ec7a9ba1e 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -54,6 +54,7 @@ extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; extern ErlDrvEntry spawn_driver_entry; +extern ErlDrvEntry forker_driver_entry; extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */ @@ -71,6 +72,7 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; erts_driver_t vanilla_driver; erts_driver_t spawn_driver; +erts_driver_t forker_driver; erts_driver_t fd_driver; int erts_port_synchronous_ops = 0; @@ -306,12 +308,9 @@ static Port *create_port(char *name, size_t port_size, busy_port_queue_size, size; erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED; erts_aint32_t x_pts_flgs = 0; -#ifdef DEBUG - /* Make sure the debug flags survives until port is freed */ - state |= ERTS_PORT_SFLG_PORT_DEBUG; -#endif #ifdef ERTS_SMP + ErtsRunQueue *runq; if (!driver_lock) { /* Align size for mutex following port struct */ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); @@ -321,6 +320,12 @@ static Port *create_port(char *name, #endif port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); +#ifdef DEBUG + /* Make sure the debug flags survives until port is freed */ + state |= ERTS_PORT_SFLG_PORT_DEBUG; +#endif + + busy_port_queue_size = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ) ? 0 @@ -356,8 +361,12 @@ static Port *create_port(char *name, p += sizeof(erts_mtx_t); state |= ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK; } - erts_smp_atomic_set_nob(&prt->run_queue, - (erts_aint_t) erts_get_runq_current(NULL)); + if (erts_get_scheduler_data()) + runq = erts_get_runq_current(NULL); + else + runq = ERTS_RUNQ_IX(0); + erts_smp_atomic_set_nob(&prt->run_queue, (erts_aint_t) runq); + prt->xports = NULL; #else erts_atomic32_init_nob(&prt->refc, 1); @@ -1535,6 +1544,26 @@ erts_schedule_proc2port_signal(Process *c_p, return ERTS_PORT_OP_SCHEDULED; } +static int +erts_schedule_port2port_signal(Eterm port_num, ErtsProc2PortSigData *sigdp, + int task_flags, + ErtsProc2PortSigCallback callback) +{ + Port *prt = erts_port_lookup_raw(port_num); + + if (!prt) + return -1; + + sigdp->caller = ERTS_INVALID_PID; + + return erts_port_task_schedule(prt->common.id, + NULL, + ERTS_PORT_TASK_PROC_SIG, + sigdp, + callback, + task_flags); +} + static ERTS_INLINE void send_badsig(Port *prt) { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; @@ -2862,6 +2891,7 @@ void erts_init_io(int port_tab_size, init_driver(&fd_driver, &fd_driver_entry, NULL); init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); init_driver(&spawn_driver, &spawn_driver_entry, NULL); + init_driver(&forker_driver, &forker_driver_entry, NULL); erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -2923,6 +2953,7 @@ void erts_lcnt_enable_io_lock_count(int enable) { lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); + lcnt_enable_drv_lock_count(&forker_driver, enable); lcnt_enable_drv_lock_count(&fd_driver, enable); /* enable lock counting in all drivers */ for (dp = driver_list; dp; dp = dp->next) { @@ -3964,7 +3995,7 @@ port_sig_control(Port *prt, Uint hsz, rsz; int control_flags; - rp = erts_proc_lookup_raw(sigdp->caller); + rp = sigdp->caller == ERTS_INVALID_PID ? NULL : erts_proc_lookup_raw(sigdp->caller); if (!rp) goto done; @@ -4007,7 +4038,8 @@ port_sig_control(Port *prt, /* failure */ - port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); + if (sigdp->caller != ERTS_INVALID_PID) + port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); done: @@ -4017,6 +4049,23 @@ done: return ERTS_PORT_REDS_CONTROL; } +/* + * This is an asynchronous control call. I.e. it will not return anything + * to the caller. + */ +int +erl_drv_port_control(Eterm port_num, char cmd, char* buff, ErlDrvSizeT size) +{ + ErtsProc2PortSigData *sigdp = erts_port_task_alloc_p2p_sig_data(); + + sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL | ERTS_P2P_SIG_DATA_FLG_REPLY; + sigdp->u.control.binp = NULL; + sigdp->u.control.command = cmd; + sigdp->u.control.bufp = buff; + sigdp->u.control.size = size; + + return erts_schedule_port2port_signal(port_num, sigdp, 0, port_sig_control); +} ErtsPortOpResult erts_port_control(Process* c_p, @@ -4794,6 +4843,8 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "Port is a file: %s\n",p->name); } else if (p->drv_ptr == &spawn_driver) { erts_print(to, arg, "Port controls external process: %s\n",p->name); + } else if (p->drv_ptr == &forker_driver) { + erts_print(to, arg, "Port controls forker process: %s\n",p->name); } else { erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name); } -- cgit v1.2.3 From 05823de18bd48b70b398a6b6fd756a0f71587283 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Thu, 27 Aug 2015 10:53:07 +0200 Subject: erts: Fix forker driver ifdefs for win32 --- erts/emulator/beam/io.c | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 6ec7a9ba1e..fc247a3260 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -54,7 +54,9 @@ extern ErlDrvEntry fd_driver_entry; extern ErlDrvEntry vanilla_driver_entry; extern ErlDrvEntry spawn_driver_entry; +#ifndef __WIN32__ extern ErlDrvEntry forker_driver_entry; +#endif extern ErlDrvEntry *driver_tab[]; /* table of static drivers, only used during initialization */ erts_driver_t *driver_list; /* List of all drivers, static and dynamic. */ @@ -72,7 +74,9 @@ const Port erts_invalid_port = {{ERTS_INVALID_PORT}}; erts_driver_t vanilla_driver; erts_driver_t spawn_driver; +#ifndef __WIN32__ erts_driver_t forker_driver; +#endif erts_driver_t fd_driver; int erts_port_synchronous_ops = 0; @@ -2891,7 +2895,9 @@ void erts_init_io(int port_tab_size, init_driver(&fd_driver, &fd_driver_entry, NULL); init_driver(&vanilla_driver, &vanilla_driver_entry, NULL); init_driver(&spawn_driver, &spawn_driver_entry, NULL); +#ifndef __WIN32__ init_driver(&forker_driver, &forker_driver_entry, NULL); +#endif erts_init_static_drivers(); for (dp = driver_tab; *dp != NULL; dp++) erts_add_driver_entry(*dp, NULL, 1); @@ -2953,7 +2959,9 @@ void erts_lcnt_enable_io_lock_count(int enable) { lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); +#ifndef __WIN32__ lcnt_enable_drv_lock_count(&forker_driver, enable); +#endif lcnt_enable_drv_lock_count(&fd_driver, enable); /* enable lock counting in all drivers */ for (dp = driver_list; dp; dp = dp->next) { @@ -4843,8 +4851,10 @@ print_port_info(Port *p, int to, void *arg) erts_print(to, arg, "Port is a file: %s\n",p->name); } else if (p->drv_ptr == &spawn_driver) { erts_print(to, arg, "Port controls external process: %s\n",p->name); +#ifndef __WIN32__ } else if (p->drv_ptr == &forker_driver) { erts_print(to, arg, "Port controls forker process: %s\n",p->name); +#endif } else { erts_print(to, arg, "Port controls linked-in driver: %s\n",p->name); } -- cgit v1.2.3 From e56d78b761680d83ab61045a62d4fc7bdb312b3c Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Mon, 28 Sep 2015 11:02:40 +0200 Subject: erts: Fix memory leak at async open port When an async open port fails early, it would sometimes leak memory. --- erts/emulator/beam/io.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index fc247a3260..df8a213a1d 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -479,6 +479,11 @@ erts_port_free(Port *prt) erts_port_task_fini_sched(&prt->sched); + if (prt->async_open_port) { + erts_free(ERTS_ALC_T_PRTSD, prt->async_open_port); + prt->async_open_port = NULL; + } + #ifdef ERTS_SMP ASSERT(prt->lock); if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) -- cgit v1.2.3 From b96e62d346eea60b2300dce22d8d892387de4681 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Wed, 14 Oct 2015 09:17:58 +0200 Subject: erts: It is not possible to exit the forker driver --- erts/emulator/beam/io.c | 5 +++++ 1 file changed, 5 insertions(+) (limited to 'erts/emulator/beam/io.c') diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index df8a213a1d..8d9199162c 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -2408,6 +2408,11 @@ erts_port_exit(Process *c_p, | ERTS_PORT_SIG_FLG_BROKEN_LINK | ERTS_PORT_SIG_FLG_FORCE_SCHED)) == 0); +#ifndef __WIN32__ + if (prt->drv_ptr == &forker_driver) + return ERTS_PORT_OP_DROPPED; +#endif + if (!(flags & ERTS_PORT_SIG_FLG_FORCE_SCHED)) { ErtsTryImmDrvCallState try_call_state = ERTS_INIT_TRY_IMM_DRV_CALL_STATE(c_p, -- cgit v1.2.3