#include "network.h"
#include "buffer.h"
#include "websocket.h"
int eag
= 0;
static int
on_message(http_parser
*parser)
{
return 0;
}
static int
on_path(http_parser
*parser,
const char *at,
size_t len)
{
rio_client
*client = parser->data;
client->path
= malloc(sizeof(char)
* (len+1));
if (client->path
== NULL)
{
error_exit("Malloc");
}
strncpy(client->path,
at, len);
client->path[len]
= '\0';
client->method
= (unsigned
char) parser->method;
debug_print("HTTP-REQ method: %d\n",
(int)
client->method);
debug_print("HTTP-REQ Path: %s %d\n",
client->path,
(int)
len);
return 0;
}
int on_header_field(http_parser
*parser,
const char *at,
size_t len)
{
return 0;
}
int on_header_value(http_parser
*parser,
const char *at,
size_t len)
{
return 0;
}
int on_headers_complete(http_parser
*parser)
{
return 0;
}
int on_body(http_parser
*parser,
const char *at,
size_t len)
{
return 0;
}
int on_message_complete(http_parser
*parser)
{
return 0;
}
http_parser_settings
parser_settings =
{
on_message,
on_path,
on_header_field,
on_header_value,
on_headers_complete,
on_body,
on_message_complete
};
void
handle_write(rio_worker
*worker,
rio_client *cli,
char* resp)
{
struct epoll_event
event;
int s,
ret;
khiter_t k;
cli->buffer
= new_rio_buffer_size(strlen(resp));
rio_buffer_copy_data(cli->buffer,
resp, strlen(resp));
free(resp);
debug_print("Handle Write: %d : %s\n",
cli->fd,
rio_buffer_get_data(cli->buffer));
event.events
= EPOLLOUT;
event.data.fd
= cli->fd;
if (epoll_ctl(worker->epoll_fd,
EPOLL_CTL_MOD, cli->fd,
&event)
== -1)
{
debug_print("Error on epoll_ctl_mod on %d\n",
cli->fd,
worker->epoll_fd);
}
k =
kh_put(clients,
h, cli->fd
, &ret);
kh_value(h,
k) =
*cli;
}
void
do_write(rio_worker
*worker,
rio_client *cli,
struct epoll_event
*event)
{
int sent;
debug_print("Do Write to fd: %d : %s\n",
cli->fd,
rio_buffer_get_data(cli->buffer));
do {
sent
= send(cli->fd,
rio_buffer_get_data(cli->buffer),
cli->buffer->length,
MSG_DONTWAIT);
if (sent
< 0 &&
errno != EAGAIN)
{
debug_print("Do Write: send error on fd: %d errno: %d\n",
cli->fd,
errno);
break;
} else
if (sent
< 0 && errno
== EAGAIN)
{
debug_print("Do Write: EAGAIN on fd: %d\n",
cli->fd);
break;
} else
if (sent
> 0) {
rio_buffer_adjust(cli->buffer,
sent);
}
} while
(sent >
0 && rio_buffer_get_data(cli->buffer)
!= NULL);
debug_print("Do Write sent: %d strlen: %zu\n",
sent, cli->buffer->length);
remove_and_close(cli,
worker, event);
}
int
handle_read(rio_worker
*worker,
rio_client *cli,
struct epoll_event
*ev)
{
size_t len
= 4096;
ssize_t
received = 0;
ssize_t
total_received = 0;
//allocate space for data
cli->buffer
= new_rio_buffer_size(sizeof(char)
* 4096);
debug_print("Handle Read from %d\n",
cli->fd);
do {
received
= recv(ev->data.fd,
cli->buffer->content,
len, MSG_DONTWAIT);
if (received
< 0)
{
if
(errno != EAGAIN
&& errno !=
EWOULDBLOCK) {
if
(received ==
0) {
//if error, remove from epoll and close socket
debug_print("Client received error: disconnected"
"errno %d\n",
errno);
}
else {
debug_print("Some other error %d\n",
errno);
}
//handle_http will take care of this :)
}
else {
//received += 1;
debug_print("EAGAIN on recv from fd: %d\n",
cli->fd);
//if EAGAIN, insert on epoll again
ev->events
= EPOLLIN |
EPOLLET;
//add socket to epoll
if
(epoll_ctl(worker->epoll_fd,
EPOLL_CTL_MOD,
cli->fd,
ev)
== -1)
{
error_exit("Could not add conn_sock to epoll");
}
eag
+= 1;
printf("EAGAIN %d\n",
eag);
}
break;
} else
if (received
== 0)
{
//client disconnected
return
0;
} else
{
total_received
+= received;
debug_print("READ AGAIN on %d\n",
cli->fd);
}
} while
(received >
0 && rio_buffer_get_data(cli->buffer)
!= NULL);
debug_print("Total received %zu\n",
total_received);
cli->buffer->length
= total_received;
return received;
}
int
remove_and_close(rio_client
*client,
rio_worker
*worker,
struct
epoll_event *event)
{
int rc
= epoll_ctl(worker->epoll_fd,
EPOLL_CTL_DEL, client->fd,
event);
if (rc
== -1)
{
debug_print("[WARNING] on epoll_ctl_del on %d\n",
client->fd,
worker->epoll_fd);
}
if (close(client->fd)
== -1)
{
debug_print("Error on close client %d\n",
client->fd,
worker->epoll_fd);
}
if (client->buffer
!= NULL)
{
rio_buffer_free(&client->buffer);
}
return rc;
}
void
handle_http(rio_worker
*worker,
struct epoll_event event,
rio_client *cli)
{
int response;
char buf[4096];
char resp[1024];
size_t n;
if (event.events
& EPOLLIN)
{
//handle read
int
received = handle_read(worker,
cli, &event);
//create http parser
http_parser
*parser = malloc(sizeof(http_parser));
if (!parser){
error_exit("malloc error: http_parser");
}
http_parser_init(parser,
HTTP_REQUEST);
//set parser data
parser->data
= (void*)cli;
//execute http parsing only if data was read
if (received
> 0)
{
debug_print("Execute http parsing client: %d\n",
cli->fd);
n
= http_parser_execute(parser,
&parser_settings,
rio_buffer_get_data(cli->buffer),
received);
}
if (parser->upgrade)
{
//#TODO: what to do?
} else
if (received
== 0)
{ // client disconnected!
debug_print("Client %d Disconnected!\n",
cli->fd);
//delete fd from epoll and close
remove_and_close(cli,
worker, &event);
free(parser);
return;
} else
if (n
!= received)
{
debug_print("Error parsing, closing socket n:%zu received:%d\n",
n,
received);
//delete fd from epoll and close
remove_and_close(cli,
worker, &event);
free(parser);
return;
}
rio_buffer_free(&cli->buffer);
response
= dispatch(cli,
cli->path);
if (response
!= DISPATCH_FINISHED)
{
//write response
debug_print("Async Dispatch to fd: %d\n",
cli->fd);
}
debug_print("Freeing %s\n",
cli->path);
free(cli->path);
cli->path
= NULL;
free(parser);
} else
if (event.events
& EPOLLOUT)
{
//if socket is ready to write, do it!
do_write(worker,
cli, &event);
}
}
void
init_clients()
{
h =
kh_init(clients);
}
void
free_clients()
{
khiter_t
element;
rio_client
*cli;
debug_print("Closing clients structures\n",
h);
for (element
= kh_begin(h);
element != kh_end(h);
++element)
{
if (kh_exist(h,
element)) {
debug_print("%d\n",
((rio_client)
kh_val(h,
element)).fd);
kh_del(clients,
h, element);
}
}
kh_destroy(clients,
h);
}
int
socket_bind()
{
int server_fd;
int arg;
struct sockaddr_in
sin;
//bind
memset(&sin,
0, sizeof(struct
sockaddr_in));
sin.sin_family
= AF_INET;
sin.sin_port
= htons(80);
sin.sin_addr.s_addr
= inet_addr("0.0.0.0");
//create socket
if ((server_fd
= socket(AF_INET,
SOCK_STREAM, 0))
< 0)
{
error_exit("Could not create socket.");
}
//set socket non-blocking
if (fcntl(server_fd,
F_SETFL, O_NONBLOCK)
== -1)
{
error_exit("Could not set socket non-blocking");
}
//set socket options
arg =
1;
if (setsockopt
(server_fd,
SOL_SOCKET, SO_REUSEADDR,
&arg,
sizeof(arg))
== -1)
{
error_exit("Socket options");
}
//bind socket to local addr
if (bind(server_fd,
(struct sockaddr
*)&sin,
sizeof(sin))
< 0)
{
error_exit("bind");
}
//listen on this socket
if (listen(server_fd,
MAX_EVENTS) <
0) {
error_exit("listen");
}
return server_fd;
}
void
accept_incoming_connection(rio_runtime
*runtime,
rio_worker *worker)
{
int new_connection_socket;
int flags;
int ret;
unsigned
int client_len;
struct epoll_event
ev;
struct sockaddr_in
temp_client;
rio_client
cli;
khiter_t
k;
client_len
= sizeof(temp_client);
//accept client connection
new_connection_socket
= accept(runtime->server_fd,
(struct
sockaddr *)
&temp_client,
&client_len);
if (new_connection_socket
== -1)
{
//#TODO what to do?
error_exit("Could not accept socket");
}
//check sockets flags and set non-blocking after
if (-1
== (flags
= fcntl(new_connection_socket,
F_GETFL, 0)))
{
flags
= 0;
}
if (fcntl(new_connection_socket,
F_SETFL, flags
| O_NONBLOCK)
== -1)
{
error_exit("Could not set client socket non-blocking");
}
ev.events
= EPOLLIN |
EPOLLET;
ev.data.fd
= new_connection_socket;
//add socket to epoll
if (epoll_ctl(worker->epoll_fd,
EPOLL_CTL_ADD,
new_connection_socket,
&ev)
== -1)
{
error_exit("Could not add conn_sock to epoll");
}
//store client information
cli.fd
= new_connection_socket;
cli.websocket
= 0;
cli.buffer
= NULL;
k =
kh_put(clients,
h, new_connection_socket
, &ret);
kh_value(h,
k) =
cli;
debug_print("New Client: %d\n",
cli.fd);
}
void
run_worker(int
id, rio_worker*
worker, rio_runtime
*runtime)
{
int size_epoll_events;
int rc;
struct epoll_event
ev, events[MAX_EVENTS];
khiter_t
k;
rio_client
cli;
sprintf(worker->name,
"worker %d", id);
debug_print("Identifying worker as %s pid %d\n",
worker->name,
getpid());
init_clients();
init_dispatcher();
init_static_server();
worker->zmq_context
= zmq_init(1);
worker->master
= zmq_socket(worker->zmq_context,
ZMQ_SUB);
zmq_setsockopt(worker->master,
ZMQ_SUBSCRIBE, "",
strlen(""));
zmq_connect(worker->master,
"ipc:///tmp/rio_master.sock");
//create epoll
worker->epoll_fd
= epoll_create(MAX_EVENTS);
if (worker->epoll_fd
== -1)
{
error_exit("epoll_create");
}
//configure epoll events and file descriptor
ev.events
= EPOLLIN |
EPOLLPRI;
ev.data.fd
= runtime->server_fd;
//add listen socket to epoll
if (epoll_ctl(worker->epoll_fd,
EPOLL_CTL_ADD,
runtime->server_fd,
&ev)
== -1)
{
error_exit("epoll_ctl: listen_sock");
}
while (1)
{
//poll events
size_epoll_events
= epoll_wait(worker->epoll_fd,
events,
MAX_EVENTS,
100);
if (size_epoll_events
== -1
&& errno !=
EWOULDBLOCK) {
break;
}
for (int
n = 0;
n < size_epoll_events;
++n)
{
//if event fd == server fd -> accept new connection
if
(events[n].data.fd
== runtime->server_fd)
{
accept_incoming_connection(runtime,
worker);
}
else { //handle in out readyness :)
//retrieve client info by fd and handle event
k
= kh_get(clients,
h, events[n].data.fd);
cli
= kh_val(h,
k);
handle_http(worker,
events[n],
&cli);
}
}
//dispatch responses
dispatch_responses(worker);
//look for master messages
zmq_msg_t
msg;
zmq_msg_init(&msg);
rc =
zmq_recv(worker->master,
&msg,
ZMQ_NOBLOCK);
if (rc
== 0)
{
debug_print("Worker %d Received %s from master\n",
id,
(char
*) zmq_msg_data(&msg));
}
if (strcmp((char
*) zmq_msg_data(&msg),
"terminate") ==
0) {
zmq_msg_close(&msg);
break;
}
zmq_msg_close(&msg);
}
debug_print("\nWorker terminating gracefully\n",
worker);
rc =
zmq_close(worker->master);
debug_print("Worker ZMQ Socket close return %d\n",
rc);
rc =
zmq_term(worker->zmq_context);
debug_print("Worker ZMQ Context termination return :%d\n",
rc);
free_clients();
destroy_static_server();
destroy_dispatcher();
close(worker->epoll_fd);
}