From e732293edc5a970885d9eb362dcaddf3548f84a4 Mon Sep 17 00:00:00 2001 From: Dan Gudmundsson Date: Tue, 20 Oct 2015 10:19:16 +0200 Subject: wx: Use only one ring buffer for command queue Avoid copying between command queues, cleaner, faster and safer implementation. --- lib/wx/c_src/wxe_helpers.cpp | 58 +++++++++++++++++++++---- lib/wx/c_src/wxe_helpers.h | 4 ++ lib/wx/c_src/wxe_impl.cpp | 101 ++++++++++++++----------------------------- lib/wx/c_src/wxe_impl.h | 4 +- 4 files changed, 87 insertions(+), 80 deletions(-) (limited to 'lib') diff --git a/lib/wx/c_src/wxe_helpers.cpp b/lib/wx/c_src/wxe_helpers.cpp index cc57374d5b..1696b8bd50 100644 --- a/lib/wx/c_src/wxe_helpers.cpp +++ b/lib/wx/c_src/wxe_helpers.cpp @@ -61,6 +61,7 @@ wxeFifo::wxeFifo(unsigned int sz) m_max = sz; m_n = 0; m_first = 0; + cb_start = 0; m_old = NULL; for(unsigned int i = 0; i < sz; i++) { m_q[i].buffer = NULL; @@ -76,15 +77,30 @@ wxeFifo::~wxeFifo() { wxeCommand * wxeFifo::Get() { unsigned int pos; - if(m_n > 0) { + do { + if(m_n <= 0) + return NULL; + pos = m_first++; m_n--; m_first %= m_max; - return &m_q[pos]; - } - return NULL; + } while(m_q[pos].op == -1); + return &m_q[pos]; +} + +wxeCommand * wxeFifo::Peek(unsigned int *i) +{ + unsigned int pos; + do { + if(*i >= m_n || m_n <= 0) + return NULL; + pos = (m_first+*i) % m_max; + (*i)++; + } while(m_q[pos].op == -1); + return &m_q[pos]; } + void wxeFifo::Add(int fc, char * cbuf,int buflen, wxe_data *sd) { unsigned int pos; @@ -140,10 +156,12 @@ void wxeFifo::Append(wxeCommand *orig) pos = (m_first + m_n) % m_max; m_n++; + curr = &m_q[pos]; + curr->op = orig->op; + if(curr->op == -1) return; curr->caller = orig->caller; curr->port = orig->port; - curr->op = orig->op; curr->len = orig->len; curr->bin[0] = orig->bin[0]; curr->bin[1] = orig->bin[1]; @@ -171,17 +189,19 @@ void wxeFifo::Realloc() wxeCommand * old = m_q; wxeCommand * queue = (wxeCommand *)driver_alloc(new_sz*sizeof(wxeCommand)); + // fprintf(stderr, "\r\nrealloc qsz %d\r\n", new_sz);fflush(stderr); + m_max=new_sz; m_first = 0; m_n=0; m_q = queue; for(i=0; i < n; i++) { - unsigned int pos = i+first; - if(old[pos%max].op >= 0) { - Append(&old[pos%max]); - } + unsigned int pos = (i+first)%max; + if(old[pos].op >= 0) + Append(&old[pos]); } + for(i = m_n; i < new_sz; i++) { // Reset the rest m_q[i].buffer = NULL; m_q[i].op = -1; @@ -190,6 +210,26 @@ void wxeFifo::Realloc() m_old = old; } +// Strip end of queue if ops are already taken care of, avoids reallocs +void wxeFifo::Strip() +{ + while((m_n > 0) && (m_q[(m_first + m_n - 1)%m_max].op == -1)) { + m_n--; + } +} + +unsigned int wxeFifo::Cleanup(unsigned int def) +{ + if(m_old) { + driver_free(m_old); + m_old = NULL; + // Realloced we need to start from the beginning + return 0; + } else { + return def; + } +} + /* **************************************************************************** * TreeItemData * ****************************************************************************/ diff --git a/lib/wx/c_src/wxe_helpers.h b/lib/wx/c_src/wxe_helpers.h index 4f9d9ca9c3..ff949e332b 100644 --- a/lib/wx/c_src/wxe_helpers.h +++ b/lib/wx/c_src/wxe_helpers.h @@ -67,9 +67,13 @@ class wxeFifo { void Append(wxeCommand *Other); wxeCommand * Get(); + wxeCommand * Peek(unsigned int *item); void Realloc(); + void Strip(); + unsigned int Cleanup(unsigned int peek=0); + unsigned int cb_start; unsigned int m_max; unsigned int m_first; unsigned int m_n; diff --git a/lib/wx/c_src/wxe_impl.cpp b/lib/wx/c_src/wxe_impl.cpp index 5b34f08704..f81d0bbbd9 100644 --- a/lib/wx/c_src/wxe_impl.cpp +++ b/lib/wx/c_src/wxe_impl.cpp @@ -57,10 +57,8 @@ extern ErlDrvTermData init_caller; extern int wxe_status; wxeFifo * wxe_queue = NULL; -wxeFifo * wxe_queue_cb_saved = NULL; unsigned int wxe_needs_signal = 0; // inside batch if larger than 0 -unsigned int wxe_cb_invoked = 0; /* ************************************************************ * Commands from erlang @@ -123,11 +121,10 @@ bool WxeApp::OnInit() { global_me = new wxeMemEnv(); - wxe_queue = new wxeFifo(1000); - wxe_queue_cb_saved = new wxeFifo(200); + wxe_queue = new wxeFifo(2000); cb_buff = NULL; recurse_level = 0; - delayed_delete = new wxeFifo(10); + delayed_delete = new wxeFifo(100); delayed_cleanup = new wxList; wxe_ps_init2(); @@ -173,7 +170,6 @@ void WxeApp::shutdown(wxeMetaCommand& Ecmd) { wxe_status = WXE_EXITING; ExitMainLoop(); delete wxe_queue; - delete wxe_queue_cb_saved; } void WxeApp::dummy_close(wxEvent& Ev) { @@ -230,11 +226,10 @@ void handle_event_callback(ErlDrvPort port, ErlDrvTermData process) // Should we be able to handle commands when recursing? probably // fprintf(stderr, "\r\nCB EV Start %lu \r\n", process);fflush(stderr); app->recurse_level++; - app->dispatch_cb(wxe_queue, wxe_queue_cb_saved, process); + app->dispatch_cb(wxe_queue, process); app->recurse_level--; // fprintf(stderr, "CB EV done %lu \r\n", process);fflush(stderr); driver_demonitor_process(port, &monitor); - wxe_cb_invoked = 1; } } @@ -242,16 +237,11 @@ void WxeApp::dispatch_cmds() { if(wxe_status != WXE_INITIATED) return; - do { - wxe_cb_invoked = 0; - recurse_level++; - // fprintf(stderr, "\r\ndispatch_saved 0 \r\n");fflush(stderr); - int level = dispatch(wxe_queue_cb_saved, 0, WXE_STORED); - // fprintf(stderr, "\r\ndispatch_normal %d\r\n", level);fflush(stderr); - dispatch(wxe_queue, level, WXE_NORMAL); - // fprintf(stderr, "\r\ndispatch_done \r\n");fflush(stderr); - recurse_level--; - } while(wxe_cb_invoked); + recurse_level++; + // fprintf(stderr, "\r\ndispatch_normal %d\r\n", level);fflush(stderr); + dispatch(wxe_queue); + // fprintf(stderr, "\r\ndispatch_done \r\n");fflush(stderr); + recurse_level--; // Cleanup old memenv's and deleted objects if(recurse_level == 0) { @@ -260,6 +250,7 @@ void WxeApp::dispatch_cmds() wxe_dispatch(*curr); curr->Delete(); } + delayed_delete->Cleanup(); if(delayed_cleanup->size() > 0) for( wxList::compatibility_iterator node = delayed_cleanup->GetFirst(); node; @@ -269,28 +260,19 @@ void WxeApp::dispatch_cmds() destroyMemEnv(*event); delete event; } - if(wxe_queue_cb_saved->m_old) { - driver_free(wxe_queue_cb_saved->m_old); - wxe_queue_cb_saved->m_old = NULL; - } - if(delayed_delete->m_old) { - driver_free(delayed_delete->m_old); - delayed_delete->m_old = NULL; - } } } -int WxeApp::dispatch(wxeFifo * batch, int blevel, int list_type) +int WxeApp::dispatch(wxeFifo * batch) { int ping = 0; + int blevel = 0; wxeCommand *event; - if(list_type == WXE_NORMAL) erl_drv_mutex_lock(wxe_batch_locker_m); + erl_drv_mutex_lock(wxe_batch_locker_m); while(true) { while((event = batch->Get()) != NULL) { - if(list_type == WXE_NORMAL) erl_drv_mutex_unlock(wxe_batch_locker_m); + erl_drv_mutex_unlock(wxe_batch_locker_m); switch(event->op) { - case -1: - break; case WXE_BATCH_END: {--blevel; } break; @@ -321,20 +303,10 @@ int WxeApp::dispatch(wxeFifo * batch, int blevel, int list_type) break; } event->Delete(); - if(list_type == WXE_NORMAL) { - if(wxe_cb_invoked) - return blevel; - else - erl_drv_mutex_lock(wxe_batch_locker_m); - } + erl_drv_mutex_lock(wxe_batch_locker_m); + batch->Cleanup(); } - if(list_type == WXE_STORED) - return blevel; - if(blevel <= 0) { // list_type == WXE_NORMAL - if(wxe_queue->m_old) { - driver_free(wxe_queue->m_old); - wxe_queue->m_old = NULL; - } + if(blevel <= 0) { erl_drv_mutex_unlock(wxe_batch_locker_m); return blevel; } @@ -348,12 +320,13 @@ int WxeApp::dispatch(wxeFifo * batch, int blevel, int list_type) } } -void WxeApp::dispatch_cb(wxeFifo * batch, wxeFifo * temp, ErlDrvTermData process) { +void WxeApp::dispatch_cb(wxeFifo * batch, ErlDrvTermData process) { wxeCommand *event; + unsigned int peek; erl_drv_mutex_lock(wxe_batch_locker_m); + peek = batch->Cleanup(batch->cb_start); while(true) { - while((event = batch->Get()) != NULL) { - erl_drv_mutex_unlock(wxe_batch_locker_m); + while((event = batch->Peek(&peek)) != NULL) { wxeMemEnv *memenv = getMemEnv(event->port); // fprintf(stderr, " Ev %d %lu\r\n", event->op, event->caller); if(event->caller == process || // Callbacks from CB process only @@ -361,8 +334,8 @@ void WxeApp::dispatch_cb(wxeFifo * batch, wxeFifo * temp, ErlDrvTermData process event->op == WXE_CB_DIED || // Event callback process died // Allow connect_cb during CB i.e. msg from wxe_server. (memenv && event->caller == memenv->owner)) { + erl_drv_mutex_unlock(wxe_batch_locker_m); switch(event->op) { - case -1: case WXE_BATCH_END: case WXE_BATCH_BEGIN: case WXE_DEBUG_PING: @@ -373,52 +346,42 @@ void WxeApp::dispatch_cb(wxeFifo * batch, wxeFifo * temp, ErlDrvTermData process memcpy(cb_buff, event->buffer, event->len); } // continue case WXE_CB_DIED: + batch->cb_start = 0; event->Delete(); + erl_drv_mutex_lock(wxe_batch_locker_m); + batch->Strip(); + erl_drv_mutex_unlock(wxe_batch_locker_m); return; case WXE_CB_START: // CB start from now accept message from CB process only process = event->caller; break; default: - size_t start=temp->m_n; + batch->cb_start = peek; // In case of recursive callbacks if(event->op < OPENGL_START) { - // fprintf(stderr, " cb %d \r\n", event->op); wxe_dispatch(*event); } else { gl_dispatch(event->op,event->buffer,event->caller,event->bin); } - if(temp->m_n > start) { - erl_drv_mutex_lock(wxe_batch_locker_m); - // We have recursed dispatch_cb and messages for this - // callback may be saved on temp list move them - // to orig list - for(unsigned int i=start; i < temp->m_n; i++) { - wxeCommand *ev = &temp->m_q[(temp->m_first+i) % temp->m_max]; - if(ev->caller == process) { - batch->Append(ev); - } - } - erl_drv_mutex_unlock(wxe_batch_locker_m); - } break; } event->Delete(); - } else { - // fprintf(stderr, " save %d %lu\r\n", event->op, event->caller); - temp->Append(event); + erl_drv_mutex_lock(wxe_batch_locker_m); + peek = batch->Cleanup(peek); } - erl_drv_mutex_lock(wxe_batch_locker_m); } // sleep until something happens // fprintf(stderr, "%s:%d sleep %d %d\r\n", __FILE__, __LINE__, - // batch->m_n, temp->m_n);fflush(stderr); + // peek, batch->m_n);fflush(stderr); wxe_needs_signal = 1; - while(batch->m_n == 0) { + while(peek >= batch->m_n) { erl_drv_cond_wait(wxe_batch_locker_c, wxe_batch_locker_m); + peek = batch->Cleanup(peek); } wxe_needs_signal = 0; } } + /* Memory handling */ void WxeApp::newMemEnv(wxeMetaCommand& Ecmd) { diff --git a/lib/wx/c_src/wxe_impl.h b/lib/wx/c_src/wxe_impl.h index d6d3095a0f..fd25296c73 100644 --- a/lib/wx/c_src/wxe_impl.h +++ b/lib/wx/c_src/wxe_impl.h @@ -67,8 +67,8 @@ public: void shutdown(wxeMetaCommand& event); - int dispatch(wxeFifo *, int, int); - void dispatch_cb(wxeFifo * batch, wxeFifo * temp, ErlDrvTermData process); + int dispatch(wxeFifo *); + void dispatch_cb(wxeFifo * batch, ErlDrvTermData process); void wxe_dispatch(wxeCommand& event); -- cgit v1.2.3