include gateway handling into the event loop

This commit is contained in:
Florian Stinglmayr 2020-01-14 14:36:11 +01:00
parent c5c647b054
commit 9e11402d31
4 changed files with 98 additions and 27 deletions

View File

@ -62,6 +62,11 @@ void dc_gateway_set_login(dc_gateway_t gw, dc_account_t login);
void dc_gateway_set_callback(dc_gateway_t gw, dc_gateway_event_callback_t c,
void *userdata);
/**
* Returns the socket of the current gateway handle.
*/
int dc_gateway_socket(dc_gateway_t gw);
/**
* Connect the given gateway. Does nothing if the gateway is already
* connected.
@ -80,12 +85,20 @@ void dc_gateway_disconnect(dc_gateway_t gw);
bool dc_gateway_connected(dc_gateway_t gw);
/**
* Process the queue of data that came from the websocket. Since the
* gateway handle is not part of whole event_base_loop() shebang, this
* must be called individually. dc_loop_once() will do this for you, if
* you opt to use dc_loop() (which you should).
* This method should be called whenever data is available on the socket of
* the gateway that should be read and processed. This function returns false
* if a disconnect happened, and no more calls to this function should be made
* in the feature. This method is useful if you are using an event loop (or
* select) and wish to notify the gateway that data is ready.
*/
void dc_gateway_process(dc_gateway_t gw);
bool dc_gateway_process_read(dc_gateway_t gw);
/**
* This method should be called whenever the socket is ready to send data. The
* method will check internal queues for messages that require sending, and will
* also handle the heartbeat. If the gateway closed this function returns false.
*/
bool dc_gateway_process_write(dc_gateway_t gw);
/**
* utility function to make a websocket frame

View File

@ -316,7 +316,7 @@ static bool dc_gateway_handle_op(dc_gateway_t gw, json_t *j)
return true;
}
static void dc_gateway_process_read(dc_gateway_t gw)
static void dc_gateway_read(dc_gateway_t gw)
{
int ret = 0;
char buf[100] = {0};
@ -412,12 +412,21 @@ cleanup:
return r;
}
void dc_gateway_process(dc_gateway_t gw)
int dc_gateway_socket(dc_gateway_t gw)
{
curl_socket_t sock = -1;
return_if_true(gw->easy == NULL, -1);
curl_easy_getinfo(gw->easy, CURLINFO_ACTIVESOCKET, &sock);
return (int)sock;
}
bool dc_gateway_process_write(dc_gateway_t gw)
{
time_t diff = 0;
if (!dc_gateway_connected(gw)) {
return;
return false;
}
if (gw->heartbeat_interval > 0) {
@ -427,12 +436,27 @@ void dc_gateway_process(dc_gateway_t gw)
}
}
dc_gateway_process_read(gw);
while (gw->out->len > 0) {
json_t *j = g_ptr_array_index(gw->out, 0);
dc_gateway_process_out(gw, j);
g_ptr_array_remove_index(gw->out, 0);
}
return true;
}
bool dc_gateway_process_read(dc_gateway_t gw)
{
if (!dc_gateway_connected(gw)) {
return false;
}
dc_gateway_read(gw);
if (gw->buffer->len > 0) {
dc_gateway_process_frame(gw);
if (!dc_gateway_connected(gw)) {
return;
return false;
}
}
@ -440,9 +464,5 @@ void dc_gateway_process(dc_gateway_t gw)
dc_gateway_process_in(gw);
}
while (gw->out->len > 0) {
json_t *j = g_ptr_array_index(gw->out, 0);
dc_gateway_process_out(gw, j);
g_ptr_array_remove_index(gw->out, 0);
}
return true;
}

View File

@ -69,6 +69,39 @@ static void dc_loop_free(dc_loop_t p)
free(p);
}
static void gateway_handler(int sock, short what, void *data)
{
dc_loop_t l = (dc_loop_t)data;
bool again;
int i = 0;
dc_gateway_t gw = NULL;
for (; i < l->gateways->len; i++) {
if (dc_gateway_socket(g_ptr_array_index(l->gateways, i)) == sock) {
gw = g_ptr_array_index(l->gateways, i);
break;
}
}
if (gw == NULL) {
/* no longer in the list
*/
return;
}
if ((what & EV_READ) == EV_READ) {
again = dc_gateway_process_read(gw);
} else if ((what & EV_WRITE) == EV_WRITE) {
again = dc_gateway_process_write(gw);
}
if (again) {
struct event *ev = NULL;
ev = event_new(l->base, sock, EV_READ|EV_WRITE, gateway_handler, l);
event_add(ev, NULL);
}
}
static void socket_handler(int sock, short what, void *data)
{
int unused = 0;
@ -232,6 +265,16 @@ void dc_loop_remove_api(dc_loop_t loop, dc_api_t api)
void dc_loop_add_gateway(dc_loop_t l, dc_gateway_t gw)
{
return_if_true(l == NULL || gw == NULL,);
int sock = 0;
struct event *ev = NULL;
sock = dc_gateway_socket(gw);
return_if_true(sock == -1,);
ev = event_new(l->base, sock, EV_READ|EV_WRITE, gateway_handler, l);
event_add(ev, NULL);
g_ptr_array_add(l->gateways, dc_ref(gw));
}
@ -275,17 +318,5 @@ bool dc_loop_once(dc_loop_t l)
}
}
for (i = 0; i < l->gateways->len; i++) {
dc_gateway_t gw = g_ptr_array_index(l->gateways, i);
if (!dc_gateway_connected(gw)) {
if (!dc_gateway_connect(gw)) {
continue;
}
}
dc_gateway_process(gw);
}
return true;
}

View File

@ -275,6 +275,7 @@ bool dc_session_logout(dc_session_t s)
}
if (s->gateway != NULL) {
dc_gateway_disconnect(s->gateway);
dc_loop_remove_gateway(s->loop, s->gateway);
dc_unref(s->gateway);
s->gateway = NULL;
@ -323,6 +324,12 @@ bool dc_session_login(dc_session_t s, dc_account_t login)
dc_gateway_set_callback(s->gateway, dc_session_handler, s);
dc_gateway_set_login(s->gateway, s->login);
if (!dc_gateway_connect(s->gateway)) {
dc_session_logout(s);
return false;
}
dc_loop_add_gateway(s->loop, s->gateway);
}