/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1998-2011. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*
*/
#include <stdlib.h>
#include <string.h>
#include "eidef.h"
#include "eiext.h"
#include "reg.h"
#include "eisend.h"
#include "eirecv.h"
#include "ei_connect_int.h"
static int mn_start_dump(int fd, const erlang_pid *self,
erlang_pid *mnesia, const char *mntab)
{
char buf[EISMALLBUF];
char *bufp = buf;
char tmpbuf[64];
int index = 0;
erlang_msg msg;
int type;
int arity;
int version;
int msglen;
int i;
int needlink;
int needpid;
/* set up rpc arguments */
/* { PidFrom, { call, Mod, Fun, Args, user }} */
ei_encode_version(buf,&index);
ei_encode_tuple_header(buf,&index,2);
ei_encode_pid(buf,&index,self); /* PidFrom */
ei_encode_tuple_header(buf,&index,5);
ei_encode_atom(buf,&index,"call"); /* call */
ei_encode_atom(buf,&index,EI_MNESIA_MODULE); /* Mod */
ei_encode_atom(buf,&index,EI_MNESIA_DUMP); /* Fun */
ei_encode_list_header(buf,&index,2); /* Args: [ table, self() ] */
ei_encode_atom(buf,&index,mntab);
ei_encode_pid(buf,&index,self);
ei_encode_empty_list(buf,&index);
ei_encode_atom(buf,&index,"user"); /* user */
/* make the rpc call */
if (ei_send_reg_encoded(fd,self,"rex",buf,index)) return -1;
/* get the reply: expect link and pid (not sure which will come first though) */
needlink = needpid = 1;
while (needlink || needpid) {
/* get message */
while (1) {
index = EISMALLBUF;
if (!(i = ei_recv_internal(fd,&bufp,&index,&msg,&msglen,1,0))) continue;
else break;
}
switch (i) {
case ERL_LINK:
/* got link */
if (!needlink) return -1;
needlink = 0;
break;
case ERL_SEND:
/* got message - does it contain a pid? */
if (!needpid) return -1;
else {
/* expecting { rex, <pid> } */
index = 0;
if (ei_decode_version(buf,&index,&version)
|| ei_decode_tuple_header(buf,&index,&arity)
|| (arity != 2)
|| ei_decode_atom(buf,&index,tmpbuf)
|| strcmp(tmpbuf,"rex")
|| ei_get_type_internal(buf,&index,&type,&arity)
|| (type != ERL_PID_EXT))
return -1; /* bad response from other side */
if (ei_decode_pid(buf,&index,mnesia)) return -1;
/* got pid */
needpid = 0;
}
break;
default:
return -1; /* wasn't link or pid */
}
}
return 0;
}
static int mn_send_commit(int fd, erlang_pid *mnesia, erlang_pid *self)
{
char buf[EISMALLBUF];
char *bufp=buf;
char string[256];
int index = 0;
int version,arity;
int msglen;
erlang_msg msg;
int i;
/* set up commit message { commit, self() } */
ei_encode_version(buf,&index);
ei_encode_tuple_header(buf,&index,2);
ei_encode_atom(buf,&index,EI_MNESIA_COMMIT);
ei_encode_pid(buf,&index,self);
/* send it */
if (ei_send_encoded(fd,mnesia,buf,index)) return -1;
/* get reply */
while (1) {
index = EISMALLBUF;
if (!(i=ei_recv_internal(fd,&bufp,&index,&msg,&msglen,1,0))) continue;
else if (i < 0) return -1;
else break;
}
if (i == ERL_SEND) {
index = 0;
if (ei_decode_version(buf,&index,&version)
|| ei_decode_tuple_header(buf,&index,&arity)
|| ei_decode_atom(buf,&index,string))
return -1;
if (!strcmp(string,"ok")) return 0;
}
/* wrong message type */
return -1;
}
static int mn_send_delete(int fd, erlang_pid *mnesia, const char *key)
{
char sbuf[EISMALLBUF];
char *dbuf = NULL;
char *msgbuf;
int index = 0;
int len = strlen(key) + 32; /* 32 is a slight overestimate */
if (len > EISMALLBUF)
if (!(dbuf = malloc(len)))
return -1;
msgbuf = (dbuf ? dbuf : sbuf);
/* set up delete message { delete, Key } */
ei_encode_version(msgbuf,&index);
ei_encode_tuple_header(msgbuf,&index,2);
ei_encode_atom(msgbuf,&index,EI_MNESIA_DELETE);
ei_encode_string(msgbuf,&index,key);
/* send it */
if (ei_send_encoded(fd,mnesia,msgbuf,index)) {
if (dbuf) free(dbuf);
return -1;
}
if (dbuf) free(dbuf);
return 0;
}
static int mn_send_write(int fd, erlang_pid *mnesia, const char *key, ei_reg_obj *obj)
{
char sbuf[EISMALLBUF];
char *dbuf = NULL;
char *msgbuf;
int index = 0;
int keylen = strlen(key) + 1;
int len = 32 + keylen + obj->size;
if (len > EISMALLBUF)
if (!(dbuf = malloc(len)))
return -1;
msgbuf = (dbuf ? dbuf : sbuf);
ei_encode_version(msgbuf,&index);
ei_encode_tuple_header(msgbuf,&index,6);
ei_encode_atom(msgbuf,&index,EI_MNESIA_WRITE);
ei_encode_string(msgbuf,&index,key);
ei_encode_long(msgbuf,&index,keylen);
ei_encode_long(msgbuf,&index,obj->attr);
ei_encode_long(msgbuf,&index,obj->size);
switch (ei_reg_typeof(obj)) {
case EI_INT:
ei_encode_long(msgbuf,&index,obj->val.i);
break;
case EI_FLT:
ei_encode_double(msgbuf,&index,obj->val.f);
break;
case EI_STR:
if (obj->size > 0) ei_encode_string(msgbuf,&index,obj->val.s);
else ei_encode_long(msgbuf,&index, (long)NULL); /* just the NULL pointer */
break;
case EI_BIN:
if (obj->size > 0) ei_encode_binary(msgbuf,&index,obj->val.p,obj->size);
else ei_encode_long(msgbuf,&index,(long)(obj->val.p)); /* just the pointer */
break;
default:
if (dbuf) free(dbuf);
return -1;
}
/* send it */
if (ei_send_encoded(fd,mnesia,msgbuf,index)) {
if (dbuf) free(dbuf);
return -1;
}
if (dbuf) free(dbuf);
return 0;
}
static int mn_get_unlink(int fd)
{
erlang_msg msg;
char buf[EISMALLBUF];
char *bufp=buf;
int index;
int msglen;
/* wait for unlink or exit */
while (1) {
index = EISMALLBUF;
switch (ei_recv_internal(fd,&bufp,&index,&msg,&msglen,1,0)) {
case 0: continue;
case ERL_UNLINK: return 0;
default: return -1;
}
}
return 0;
}
/* dump to backup */
/* fd is open connection to erlang node */
int ei_reg_dump(int fd, ei_reg *reg, const char *mntab, int flags)
{
ei_hash *tab;
erlang_pid self;
erlang_pid mnesia;
ei_bucket *b;
ei_reg_obj *obj;
const char *key;
ei_cnode *ec;
int i;
if (!reg || !mntab) return -1; /* return EI_BADARG; */
tab = reg->tab;
/* make a self pid */
if ((ec = ei_fd_to_cnode(fd)) == NULL) {
return -1;
}
strcpy(self.node,ei_thisnodename(ec));
self.num = fd;
self.serial = 0;
self.creation = ei_thiscreation(ec);
if (mn_start_dump(fd,&self,&mnesia,mntab)) return -1;
/* traverse the table, passing objects to mnesia */
for (i=0; i<tab->size; i++) {
b=tab->tab[i];
while (b) {
obj = (ei_reg_obj*)(b->value); /* cast to eliminate 'const' warning */
key = b->key;
if ((flags & EI_FORCE) || (obj->attr & EI_DIRTY)) {
if (obj->attr & EI_DELET) {
if (mn_send_delete(fd,&mnesia,key)) {
ei_send_exit(fd,&self,&mnesia,"delete failed");
return -1;
}
}
else {
if (mn_send_write(fd,&mnesia,key,obj)) {
ei_send_exit(fd,&self,&mnesia,"update failed");
return -1;
}
}
}
b = b->next;
}
}
/* end the transaction */
if (mn_send_commit(fd,&mnesia,&self)) {
ei_send_exit(fd,&self,&mnesia,"commit failed");
return -1;
}
/* wait for unlink */
if (mn_get_unlink(fd)) return -1;
/* this point only reached if all went ok so far... */
/* now remove all deleted objects, unless the caller asked us not to */
if (!(flags & EI_NOPURGE)) ei_reg_purge(reg);
/* success */
return 0;
}