CS 프로그램/개발

TCP/IP 소켓프로그래밍 : 멀티플렉싱 기반 서버

parktest0325 2020. 9. 20. 21:32

IO멀티플렉싱

하나의 프로세스로 여러개의 소켓을 묶어서 핸들링 하는것

프로세스를 생성하는것은 서버에 부담이 가고, 생각보다 서버와 클라이언트의 입출력이 많지 않기 때문에 멀티프로세스 서버 보다 대부분의 경우에 더 낫다.

 

fd_set : 각 원소는 1bit이며 0과 1만 저장된다. 열러있는 소켓의 파일 디스크립터 값에 해당하는 인덱스가 1로 세팅된다. 

 ex) 리스닝 소켓 fd=5, 서비스소켓 fd=6, 서비스소켓 fd=7 이렇게 열려있다면 0000011100000...

 

select : 소켓에 대한 상태를 물어볼 수 있는 함수

 - 수신한 데이터를 지니고 있는 소켓이 있는지? (= 입력버퍼에 값이 들어있는 소켓이 있다면 fd_set에 세팅)

 - 블로킹 되지 않고 데이터 전송이 가능한 소켓은 무엇인지? 

 - 예외상황이 발생한 소켓은 무엇인지?

 

 

int select(int maxfd, fd_set* readset, fd_set* writeset, fd_set* exceptset, timeval* timeout);

 - maxfd : 검사 대상이 되는 파일디스크립터의 수 정확한 개수를 지정하는것보단 맥시멈 파일디스크립터 수 + 1 전달함

 - readset : 수신된 데이터가 존재하는 fd에만 1이 세팅됨. 관심 없으면 0 전달

 - writeset : 블로킹없이 데이터 전송가능한 fd에만 1이 세팅됨. 관심 없으면 0 전달

 - exceptset : 예외상황이 발생한 fd에만 1이 세팅됨. 관심 없으면 0 전달

 - timeout : 반환하지 않고 블록킹에 빠지지않도록 타임아웃함. 셀렉트함수 리턴시 구조체엔 잔여시간만 남아있다. 

반환값 : 보통은 변화가 발생된 디스크립터 수가 반환됨. 오류 발생시에는 -1 반환, 타임아웃에 의한 반환시에는 0 반환. 

struct timeval{
    long tv_sec;
    long tv_usec;
}

 

입력버퍼에 데이터가 들어온 소켓이 있는지 확인한다. 확인이 되면 함수가 반환되며 fd_set 구조체에서 해당 소켓을 제외하고 0으로 세팅해버린다. 원본 fd_set에는 열려있는 소켓의 정보가 저장되어있는데, 이것을 잃지 않기위해 fd_set 구조체의 복사본으로 select 함수를 호출해야한다. 

만약 두개의 데이터가 한번에 들어온 경우도 있을 수 있기 때문에 for문을 통해 검사할때 1이 발견된다 하더라도 복사본 fd_set의 구조체를 일정 범위까지 전부 확인해야한다. 

 

FD_ZERO(&fd_set) : fd_set 구조체를 0으로 초기화

FD_SET(fd, &fd_set) : fd_set 구조체에서 fd의 값에 해당하는 인덱스를 1로 세팅

FD_CLR(fd, &fd_set) : fd_set 구조체에서 fd의 값에 해당하는 인덱스를 0으로 세팅

FD_ISSET(fd, &fd_set) : fd_set 구조체에서 fd의 값에 해당하는 인덱스가 1로 세팅되어있는지 확인

 

int main(int argc, char* argv[]){
    int serv_sock, clnt_sock;
    struct sockaddr_in serv_adr, clnt_adr;
    struct timeval timeout;
    fd_set reads, cpy_reads;
    socklen_t adr_sz;
    int fd_max, str_len, fd_num, i;
    char buf[BUF_SIZE];
    
    //주소 초기화
    serv_sock=socket(PF_INET, SOCK_STREAM, 0);
    memset(&serv_adr, 0, sizeof(serv_adr));
    serv_adr.sin_family=AF_INET;
    serv_adr.sin_addr.s_addr=htonl(INADDR_ANT);
    serv_adr.sin_port=htons(atoi(argv[1]));
    
    // 포트-소켓 바인딩, 리스닝시작
    bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr));
    listen(serv_sock, 5);
    
    FD_ZERO(&reads);
    FD_SET(serv_sock, &reads); // 리스닝 포트를 읽기 감시포트로 설정. 연결요청을 감시하기 위함
    fd_max = serv_sock; // 확인해야할 fd 마지막번호. fd는 프로세스별로 3부터 시작한다.
    
    while(1){
        cpy_reads=reads; // 복사본으로 select 함수를 호출해야한다. 
        timeout.tv_sec=5; // select 후 변조되기 때문에 while문 안에서 세팅을 계속 해줘야함.
        timeout.tv_usec=5000;
        
        // 읽기 버퍼에 데이터가 있는 fd 확인. 복사본 전달
        if((fd_num=select(fd_max+1, &cpy_reads, 0, 0, &timeout))==-1)
            break;
        if(fd_num==0) // 타임아웃으로 반환되면 다시 처음부터
            continue;
        for(i=0; i<fd_max+1; i++){ // index는 0부터 시작함. 어차피 select는 012 전부 확인함
            if(FD_ISSET(i, &cpy_reads)){
                if(i==serv_sock){ // 검사한 인덱스가 serv_sock일때 accept함수 호출
                    adr_sz=sizeof(clnt_adr);
                    clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr, &adr_sz);
                    FD_SET(clnt_sock, &reads);
                    if(fd_max<clnt_sock)
                        fd_max=clnt_sock;
                    printf("connect client: %d \n", clnt_sock);
                } else {  // 클라이언트 소켓에서 발생한 경우 
                    str_len = read(i, buf, BUFSIZE);
                    if(str_len==0){ // 읽을 데이터가 없으면 (닫기요청임)
                        FD_CLR(i, &reads); // 이 소켓을 삭제
                        close(i);
                        printf("close client: %d \n", i);
                    } else {
                        write(i, buf, str_len); // 에코메시지 전달
                    }
                }
            }
        }
    }
}

 

리눅스 - epoll

select 함수는 호출하기 전에 관찰할 fd를 fdset구조체에 직접 지정하고, 복사본으로 계속 호출해주면서 운영체제의 값을 세팅하지만 반복문을 통해 값을 직접 검사하여 변화를 확인하는 등 OS에 크게 의존적인 모델이 아니다. 

epoll은 운영체제가 대부분의 일을 도와주기 때문에 감시할 소켓을 운영체제에 알려준뒤 함수를 호출하면 운영체제가 변화를 감지한 경우 어떤 소켓이 변화했는지 프로그램에게 묶어서 전달해준다.

하지만 운영체제에 종속적이기 때문에 리눅스 전용 api이다. 그렇기 때문에 select 함수는 접속자수가 적고, 다양한 운영체제에서 사용되어야 할때 코드를 변경하지 않아도되는 장점이 있다. 

 

int epoll_create(int size) : epoll 인스턴스(fd 저장소)를 운영체제에 생성요청. 성공시 epoll fd반환

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) : 저장소에 파일 디스크립터를 등록하거나 삭제

 - epfd : epoll 인스턴스의 파일 디스크립터

 - op : EPOLL_CTL_ADD(추가), EPOLL_CTL_DEL(삭제), EPOLL_CTL_MOD(변경)

 - fd : 관찰 대상 소켓 파일디스크립터 

 - event : 어떤 이벤트를 관찰할 것인지 전달. 관찰소켓 삭제할땐 NULL을 전달함

int epoll_wait(int epfd, struct epoll_event * evnets, int maxevents, int timeout) : 파일 디스크립터의 변화를 대기

 - events : 이벤트가 발생한 경우 event구조체가 저장될 배열

 - maxevents : 동시에 처리될 최대 이벤트수. EPOLL_SIZE는 epoll 인스턴스의 크기이다. 

 - timeout : 1/1000초의 대기시간. -1 전달 시 무한대기

 - 리턴값 : 이벤트가 발생한 소켓 수

 

소켓 디스크립터를 등록할때와 이벤트 발생 시 확인할때 사용되는 구조체

struct epoll_event{
    __uint32_t events;
    epoll_data_t data;
}
typedef union epoll_data{
    void* ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
}

////////////////
struct epoll_event event;
event.events=EPOLLIN;
event.data.fd=sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);

int event_cnt;
struct epoll_event* ep_events;
ep_events=malloc(sizeof(struct epoll_event)*EPOLL_SIZE);

while(1){
    event_cnt=epoll_wait(epfd, ep_events, EPOLL_SIZE, -1);
    if(event_cnt==-1){
        puts("epoll_wait() error");
        break;
    }
    for(i=0; i<event_cnt; i++){
        if(ep_events[i].data.fd==serv_sock){
            adr_sz=sizeof(clnt_adr);
            /*...연결요청수락...*/
        } else {
            str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
            /*...데이터읽기...*/
        }
    }
}

| 연산자를 통해 여러 이벤트를 지정할 수 있다. 

EPOLLIN : 수신할 데이터가 존재하는 상황

EPOLLOUT : 출력버퍼가 비워져서 당장 데이터 전송이 가능한 상황

EPOLLPRI : OOB 데이터가 수신된 상황

EPOLLRDHUP : 연결이 종료되거나 Half-close가 진행된 상황. 엣지트리거에 사용됨

EPOLLERR : 에러가 발생된 상황

EPOLLET : 이벤트의 감지를 엣지 트리거 방식으로 동작시킨다.

EPOLLONESHOT : 이벤트가 감지되면 더이상 이벤트를 발생시키지 않는다. 재설정을 위해선 EPOLL_CTL_MOD사용 

 

어차피 감시할 fd를 3번째 인자로 전달하는데 event 구조체에 넣는 이유는 나중에 이벤트가 발생했을때 events 구조체를 그대로 전달해주는데 어떤 소켓에서 발생했는지 확인하기 위해 넣는것이다. 

 

서버의 소켓에 데이터가 들어오면 이벤트가 발생하게 되는데, TCP같은 경우에는 데이터의 경계가 없어서 데이터의 일부만 읽어올 수 도 있다. 그럼 소켓의 입력버퍼에는 데이터가 일부 남아있게 되는데, 이때에도 이벤트가 발생한다면 레벨트리거, 발생되지 않는다면 엣지트리거로 나뉜다. 

 

epoll - 레벨트리거 모델

상태가 초기와 다른 상태인 경우 계속 이벤트가 발생(입력버퍼에 데이터가 존재하는 중인 경우 발생)

레벨트리거 모델인 경우에는 입력버퍼를 한번에 비워주는 형태로 코드를 작성해야 이벤트가 자주 발생되지 않아서 epoll 함수도 자주 발생되지 않고 결국엔 리소스 낭비를 줄여줄 수 있다. 

위에 작성된 코드는 레벨트리거이다. 

 

epoll - 엣지트리거 모델

상태가 변화될 때 이벤트가 발생(입력버퍼에 데이터가 없다가 생겼을때) 레벨트리거보다 구현이 좀더 복잡하기 때문에 레벨트리거로만 해결된다면 레벨트리거로 작성하는게 좋다.

 

데이터 수신시 딱 한번만 이벤트가 발생하기 때문에 이벤트가 발생했을때 모든 데이터를 다 읽거나, 버퍼가 남아있는 소켓을 기억하고있어야한다. 

#include <error.h> // errno라는 전역변수값 포함
int errno	// errno 변수가 EAGAIN 값이 저장되면 버퍼가 비어있는 상태이다. 
		// 데이터 수신이 완료되었는지 확인하기 위해 참조한다.

int flag=fcntl(fd, F_GETFL, 0);		// fd 소켓의 flag값을 가져온다
fcntl(fd, F_SETFL, flag|O_NONBLOCK);	// fd 소켓에 기존 flag값 | O_NONBLOCK으로 속성을 추가한다

레벨트리거와 엣지트리거의 epoll 설정은 NONBLOCK 모드 외에는 다를것이 없다.

else{
    while(1){
        str_len=read(ep_events[i].data.fd, buf, BUF_SIZE);
        if(str_len==0){
            epoll_ctl(epfd, EPOLL_CTL_DEL, ep_events[i].data.fd, NULL);
            close(ep_events[i].data.fd);
            printf("closed client: %d \n", ep_events[i].data.fd);
            break;
        } else if(str_len<0){
            if(errno == EAGAIN)
                break;
        } else {
            write(ep_events[i].data.fd, buf, str_len);
        }
    }
}

while 문이 사용된 이유는 모든 한소켓의 모든 데이터를 읽어들이기 위함이고, 서버 모델에 따라 달라질 수 있다.

 

 

 

윈도우 - iocp