/* * %CopyrightBegin% * * Copyright Ericsson AB 2012-2016. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ #include "erl_driver.h" #define THR_MSG_BLAST_NO_PROCS 10 #define THR_MSG_BLAST_NO_SENDS_PER_PROC 10000 #define THR_MSG_BLAST_THREADS 32 static void stop(ErlDrvData drv_data); static ErlDrvData start(ErlDrvPort port, char *command); static ErlDrvSSizeT control(ErlDrvData drv_data, unsigned int command, char *buf, ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen); static ErlDrvEntry thr_msg_blast_drv_entry = { NULL /* init */, start, stop, NULL /* output */, NULL /* ready_input */, NULL /* ready_output */, "thr_msg_blast_drv", NULL /* finish */, NULL /* handle */, control, NULL /* timeout */, NULL /* outputv */, NULL /* ready_async */, NULL /* flush */, NULL /* call */, NULL /* event */, ERL_DRV_EXTENDED_MARKER, ERL_DRV_EXTENDED_MAJOR_VERSION, ERL_DRV_EXTENDED_MINOR_VERSION, ERL_DRV_FLAG_USE_PORT_LOCKING, NULL /* handle2 */, NULL /* handle_monitor */ }; typedef struct { ErlDrvPort port; ErlDrvTermData td_port; ErlDrvTermData hi; ErlDrvTid tid[THR_MSG_BLAST_THREADS]; int no_thrs; ErlDrvTermData proc[THR_MSG_BLAST_NO_PROCS]; int no_procs; } thr_msg_blast_data_t; DRIVER_INIT(thr_msg_blast_drv) { return &thr_msg_blast_drv_entry; } static void stop(ErlDrvData drv_data) { int i; thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data; for (i = 0; i < tmbd->no_thrs; i++) erl_drv_thread_join(tmbd->tid[i], NULL); driver_free((void *) tmbd); } static ErlDrvData start(ErlDrvPort port, char *command) { thr_msg_blast_data_t *tmbd; tmbd = driver_alloc(sizeof(thr_msg_blast_data_t)); if (!tmbd) return ERL_DRV_ERROR_GENERAL; tmbd->port = port; tmbd->td_port = driver_mk_port(port); tmbd->hi = driver_mk_atom("hi"); tmbd->no_thrs = 0; tmbd->no_procs = 1; tmbd->proc[0] = driver_caller(port); return (ErlDrvData) tmbd; } static void *thread(void *); static ErlDrvSSizeT control(ErlDrvData drv_data, unsigned int command, char *buf, ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen) { thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data; char *res_str = "error"; if (tmbd->no_procs >= THR_MSG_BLAST_NO_PROCS) { int i; for (i = 0; i < tmbd->no_thrs; i++) erl_drv_thread_join(tmbd->tid[i], NULL); tmbd->no_thrs = 0; res_str = "done"; } else { tmbd->proc[tmbd->no_procs++] = driver_caller(tmbd->port); if (tmbd->no_procs == THR_MSG_BLAST_NO_PROCS) { for (tmbd->no_thrs = 0; tmbd->no_thrs < THR_MSG_BLAST_THREADS; tmbd->no_thrs++) { int res = erl_drv_thread_create("test", &tmbd->tid[tmbd->no_thrs], thread, tmbd, NULL); if (res != 0) { driver_failure_posix(tmbd->port, res); goto done; } } } res_str = "receiver"; } done: { ErlDrvSSizeT res_len = strlen(res_str); if (res_len > rlen) { char *abuf = driver_alloc(sizeof(char)*res_len); if (!abuf) return 0; *rbuf = abuf; } memcpy((void *) *rbuf, (void *) res_str, res_len); return res_len; } } static void *thread(void *varg) { int s, p; thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) varg; ErlDrvTermData spec[] = { ERL_DRV_PORT, tmbd->td_port, ERL_DRV_ATOM, tmbd->hi, ERL_DRV_TUPLE, 2 }; for (s = 0; s < THR_MSG_BLAST_NO_SENDS_PER_PROC; s++) { for (p = 0; p < THR_MSG_BLAST_NO_PROCS; p++) { int res = erl_drv_send_term(tmbd->td_port, tmbd->proc[p], spec, sizeof(spec)/sizeof(spec[0])); if (p == 0 && res <= 0) abort(); /* Could not send to creator */ } } return NULL; }