且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

network.c

更新时间:2022-08-22 11:34:48

 
 
#include "network.h"
#include "buffer.h"
#include "websocket.h"

int eag = 0;

static int
    on_message(http_parser *parser)
{
    return 0;
}

static int
    on_path(http_parser *parser, const char *at, size_t len)
{
    rio_client *client = parser->data;
    client->path = malloc(sizeof(char) * (len+1));

    if (client->path == NULL) {
        error_exit("Malloc");
    }
    strncpy(client->path, at, len);
    client->path[len] = '\0';

    client->method = (unsigned char) parser->method;

    debug_print("HTTP-REQ method: %d\n", (int) client->method);
    debug_print("HTTP-REQ Path: %s %d\n", client->path, (int) len);

    return 0;
}

int on_header_field(http_parser *parser, const char *at, size_t len) {
    return 0;
}

int on_header_value(http_parser *parser, const char *at, size_t len) {
    return 0;
}

int on_headers_complete(http_parser *parser) {
    return 0;
}

int on_body(http_parser *parser, const char *at, size_t len) {
    return 0;
}

int on_message_complete(http_parser *parser) {
    return 0;
}

http_parser_settings parser_settings =
{
    on_message,
    on_path,
    on_header_field,
    on_header_value,
    on_headers_complete,
    on_body,
    on_message_complete
};

void
    handle_write(rio_worker *worker, rio_client *cli, char* resp)
{
    struct epoll_event event;
    int s, ret;
    khiter_t k;

    cli->buffer = new_rio_buffer_size(strlen(resp));
    rio_buffer_copy_data(cli->buffer, resp, strlen(resp));

    free(resp);

    debug_print("Handle Write: %d : %s\n", cli->fd,
                                rio_buffer_get_data(cli->buffer));

    event.events = EPOLLOUT;
    event.data.fd = cli->fd;

    if (epoll_ctl(worker->epoll_fd, EPOLL_CTL_MOD, cli->fd, &event) == -1) {
        debug_print("Error on epoll_ctl_mod on %d\n",
                                                cli->fd, worker->epoll_fd);
    }

    k = kh_put(clients, h, cli->fd , &ret);
    kh_value(h, k) = *cli;
}

void
    do_write(rio_worker *worker, rio_client *cli, struct epoll_event *event)
{
    int sent;

    debug_print("Do Write to fd: %d : %s\n", cli->fd,
                                    rio_buffer_get_data(cli->buffer));

    do {
        sent = send(cli->fd,
                    rio_buffer_get_data(cli->buffer),
                    cli->buffer->length,
                    MSG_DONTWAIT);

        if (sent < 0 && errno != EAGAIN) {
            debug_print("Do Write: send error on fd: %d errno: %d\n",
                        cli->fd, errno);
            break;
        } else if (sent < 0 && errno == EAGAIN) {
             debug_print("Do Write: EAGAIN on fd: %d\n", cli->fd);
             break;
        } else if (sent > 0) {
            rio_buffer_adjust(cli->buffer, sent);
        }
    } while (sent > 0 && rio_buffer_get_data(cli->buffer) != NULL);

    debug_print("Do Write sent: %d strlen: %zu\n", sent, cli->buffer->length);

    remove_and_close(cli, worker, event);

}

int
    handle_read(rio_worker *worker, rio_client *cli, struct epoll_event *ev)
{
    size_t len = 4096;
    ssize_t received = 0;
    ssize_t total_received = 0;

    //allocate space for data
    cli->buffer = new_rio_buffer_size(sizeof(char) * 4096);

    debug_print("Handle Read from %d\n", cli->fd);

    do {
        received = recv(ev->data.fd, cli->buffer->content, len, MSG_DONTWAIT);
        if (received < 0) {
            if (errno != EAGAIN && errno != EWOULDBLOCK) {
                if (received == 0) {
                    //if error, remove from epoll and close socket
                    debug_print("Client received error: disconnected"
                                "errno %d\n", errno);
                } else {
                    debug_print("Some other error %d\n", errno);
                }
                //handle_http will take care of this :)
            } else {
                //received += 1;
                debug_print("EAGAIN on recv from fd: %d\n", cli->fd);

                //if EAGAIN, insert on epoll again
                ev->events = EPOLLIN | EPOLLET;
                //add socket to epoll
                if (epoll_ctl(worker->epoll_fd,
                                EPOLL_CTL_MOD,
                                cli->fd,
                                ev) == -1) {
                    error_exit("Could not add conn_sock to epoll");
                }

                eag += 1;
                printf("EAGAIN %d\n", eag);

            }
            break;
        } else if (received == 0) {
            //client disconnected
            return 0;
        } else {
            total_received += received;
            debug_print("READ AGAIN on %d\n", cli->fd);
        }
    } while (received > 0 && rio_buffer_get_data(cli->buffer) != NULL);

    debug_print("Total received %zu\n", total_received);

    cli->buffer->length = total_received;
    return received;
}

int
    remove_and_close(rio_client *client,
                     rio_worker *worker,
                     struct epoll_event *event)
{
    int rc = epoll_ctl(worker->epoll_fd, EPOLL_CTL_DEL, client->fd, event);

    if (rc == -1) {
       debug_print("[WARNING] on epoll_ctl_del on %d\n",
                   client->fd, worker->epoll_fd);
    }

    if (close(client->fd) == -1) {
        debug_print("Error on close client %d\n",
                                            client->fd, worker->epoll_fd);
    }

    if (client->buffer != NULL) {
        rio_buffer_free(&client->buffer);
    }

    return rc;
}

void
    handle_http(rio_worker *worker, struct epoll_event event, rio_client *cli)
{
    int response;

    char buf[4096];
    char resp[1024];

    size_t n;

    if (event.events & EPOLLIN) {
        //handle read
        int received = handle_read(worker, cli, &event);

        //create http parser
        http_parser *parser = malloc(sizeof(http_parser));

        if (!parser){
            error_exit("malloc error: http_parser");
        }

        http_parser_init(parser, HTTP_REQUEST);

        //set parser data
        parser->data = (void*)cli;

        //execute http parsing only if data was read
        if (received > 0) {
            debug_print("Execute http parsing client: %d\n", cli->fd);
            n = http_parser_execute(parser,
                                    &parser_settings,
                                    rio_buffer_get_data(cli->buffer),
                                    received);
        }

        if (parser->upgrade) {
            //#TODO: what to do?
        } else if (received == 0) { // client disconnected!
            debug_print("Client %d Disconnected!\n", cli->fd);

            //delete fd from epoll and close
            remove_and_close(cli, worker, &event);

            free(parser);
            return;
        } else if (n != received) {
            debug_print("Error parsing, closing socket n:%zu received:%d\n",
                        n, received);

            //delete fd from epoll and close
            remove_and_close(cli, worker, &event);

            free(parser);
            return;
        }

        rio_buffer_free(&cli->buffer);
        response = dispatch(cli, cli->path);

        if (response != DISPATCH_FINISHED) {
            //write response
            debug_print("Async Dispatch to fd: %d\n", cli->fd);
        }

        debug_print("Freeing %s\n", cli->path);

        free(cli->path);
        cli->path = NULL;
        free(parser);
    } else if (event.events & EPOLLOUT) {
        //if socket is ready to write, do it!
        do_write(worker, cli, &event);
    }
}

void
    init_clients()
{
   h = kh_init(clients);
}

void
    free_clients()
{
    khiter_t element;
    rio_client *cli;

    debug_print("Closing clients structures\n", h);

    for (element = kh_begin(h); element != kh_end(h); ++element) {
        if (kh_exist(h, element)) {
            debug_print("%d\n", ((rio_client) kh_val(h, element)).fd);
            kh_del(clients, h, element);
        }
    }

    kh_destroy(clients, h);
}

int
    socket_bind()
{
    int server_fd;
    int arg;
    struct sockaddr_in sin;

    //bind
    memset(&sin, 0, sizeof(struct sockaddr_in));
    sin.sin_family = AF_INET;
    sin.sin_port = htons(80);
    sin.sin_addr.s_addr = inet_addr("0.0.0.0");

    //create socket
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        error_exit("Could not create socket.");
    }

    //set socket non-blocking
    if (fcntl(server_fd, F_SETFL, O_NONBLOCK) == -1) {
        error_exit("Could not set socket non-blocking");
    }

    //set socket options
    arg = 1;
    if (setsockopt (server_fd, SOL_SOCKET, SO_REUSEADDR, &arg, sizeof(arg)) == -1) {
        error_exit("Socket options");
    }

    //bind socket to local addr
    if (bind(server_fd, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
        error_exit("bind");
    }

    //listen on this socket
    if (listen(server_fd, MAX_EVENTS) < 0) {
        error_exit("listen");
    }

    return server_fd;
}

void
    accept_incoming_connection(rio_runtime *runtime, rio_worker *worker)
{
    int new_connection_socket;
    int flags;
    int ret;

    unsigned int client_len;

    struct epoll_event ev;
    struct sockaddr_in temp_client;

    rio_client cli;
    khiter_t k;

    client_len = sizeof(temp_client);

    //accept client connection
    new_connection_socket = accept(runtime->server_fd,
                                  (struct sockaddr *) &temp_client,
                                   &client_len);

    if (new_connection_socket == -1) {
        //#TODO what to do?
        error_exit("Could not accept socket");
    }

    //check sockets flags and set non-blocking after
    if (-1 == (flags = fcntl(new_connection_socket, F_GETFL, 0))) {
        flags = 0;
    }

    if (fcntl(new_connection_socket, F_SETFL, flags | O_NONBLOCK) == -1) {
        error_exit("Could not set client socket non-blocking");
    }

    ev.events = EPOLLIN | EPOLLET;
    ev.data.fd = new_connection_socket;

    //add socket to epoll
    if (epoll_ctl(worker->epoll_fd,
                  EPOLL_CTL_ADD,
                  new_connection_socket,
                  &ev) == -1) {
        error_exit("Could not add conn_sock to epoll");
    }

    //store client information
    cli.fd = new_connection_socket;
    cli.websocket = 0;
    cli.buffer = NULL;

    k = kh_put(clients, h, new_connection_socket , &ret);
    kh_value(h, k) = cli;

    debug_print("New Client: %d\n", cli.fd);
}

void
    run_worker(int id, rio_worker* worker, rio_runtime *runtime)
{
    int size_epoll_events;
    int rc;

    struct epoll_event ev, events[MAX_EVENTS];

    khiter_t k;
    rio_client cli;

    sprintf(worker->name, "worker %d", id);
    debug_print("Identifying worker as %s pid %d\n", worker->name,
                                                     getpid());

    init_clients();
    init_dispatcher();
    init_static_server();

    worker->zmq_context = zmq_init(1);
    worker->master = zmq_socket(worker->zmq_context, ZMQ_SUB);

    zmq_setsockopt(worker->master, ZMQ_SUBSCRIBE, "", strlen(""));
    zmq_connect(worker->master, "ipc:///tmp/rio_master.sock");

    //create epoll
    worker->epoll_fd = epoll_create(MAX_EVENTS);
    if (worker->epoll_fd == -1) {
        error_exit("epoll_create");
    }

    //configure epoll events and file descriptor
    ev.events = EPOLLIN | EPOLLPRI;
    ev.data.fd = runtime->server_fd;

    //add listen socket to epoll
    if (epoll_ctl(worker->epoll_fd,
                  EPOLL_CTL_ADD,
                  runtime->server_fd,
                  &ev) == -1) {
        error_exit("epoll_ctl: listen_sock");
    }

    while (1) {
        //poll events
        size_epoll_events = epoll_wait(worker->epoll_fd,
                                       events,
                                       MAX_EVENTS,
                                       100);

        if (size_epoll_events == -1 && errno != EWOULDBLOCK) {
            break;
        }

        for (int n = 0; n < size_epoll_events; ++n) {
            //if event fd == server fd -> accept new connection
            if (events[n].data.fd == runtime->server_fd) {
                accept_incoming_connection(runtime, worker);
            } else { //handle in out readyness :)
                //retrieve client info by fd and handle event
                k = kh_get(clients, h, events[n].data.fd);
                cli = kh_val(h, k);
                handle_http(worker, events[n], &cli);
            }
        }
        //dispatch responses
        dispatch_responses(worker);

        //look for master messages
        zmq_msg_t msg;
        zmq_msg_init(&msg);
        rc = zmq_recv(worker->master, &msg, ZMQ_NOBLOCK);
        if (rc == 0) {
            debug_print("Worker %d Received %s from master\n",
                                            id,
                                            (char *) zmq_msg_data(&msg));
        }

        if (strcmp((char *) zmq_msg_data(&msg), "terminate") == 0) {
            zmq_msg_close(&msg);
            break;
        }

        zmq_msg_close(&msg);

    }
    debug_print("\nWorker terminating gracefully\n", worker);

    rc = zmq_close(worker->master);
    debug_print("Worker ZMQ Socket close return %d\n", rc);

    rc = zmq_term(worker->zmq_context);
    debug_print("Worker ZMQ Context termination return :%d\n", rc);

    free_clients();
    destroy_static_server();
    destroy_dispatcher();
    close(worker->epoll_fd);
}