How to check proxies asynchronously in python?

Question:

How to check proxies asynchronously in python ? I took the proxy-checker library as an example and wrote the code with the parameters I needed. It is completely working, but one check takes from 10 to 20 seconds and since the method is not asynchronous, during execution on PyQt5 interface PyQt5 and hangs for a second only between each check.

Is it possible to rewrite this code with asynchrony or maybe there are other, ready-made, asynchronous methods for checking proxies?

I've tried using:

loop = asyncio.get_event_loop()
res = await loop.run_in_executor(None, ..., ...)

The interface did not hang, but for some reason the execution results were not returned, and besides, this is not a direct solution.

Here is all my code:

import re
import time
import pycurl
import random
from io import BytesIO


proxy_judges = ['http://proxyjudge.us/azenv.php', 'http://mojeip.net.pl/asdfa/azenv.php']


def send_query(proxy=False, url=None, user=None, password=None):
    response = BytesIO()
    c = pycurl.Curl()

    c.setopt(c.URL, url or random.choice(proxy_judges))
    c.setopt(c.WRITEDATA, response)
    c.setopt(c.TIMEOUT, 5)

    if user is not None and password is not None:
        c.setopt(c.PROXYUSERPWD, f"{user}:{password}")

    c.setopt(c.SSL_VERIFYHOST, 0)
    c.setopt(c.SSL_VERIFYPEER, 0)

    if proxy:
        c.setopt(c.PROXY, proxy)

    try:
        c.perform()
    except:
        return False

    if c.getinfo(c.HTTP_CODE) != 200:
        return False

    timeout = round(c.getinfo(c.CONNECT_TIME) * 1000)
    response = response.getvalue().decode('iso-8859-1')

    return {
        'timeout': timeout,
        'response': response
    }


def get_country(ip):
    r = send_query(url='https://ip2c.org/' + ip)
    if r and r['response'][0] == '1':
        r = r['response'].split(';')
        return [r[3], r[1]]
    return ['-', '-']


def check_proxy(proxy, check_country=True, check_address=False, user=None, password=None):
    timeout, protocols = 0, {}

    for protocol in ['http', 'socks4', 'socks5']:
        r = send_query(proxy=protocol + '://' + proxy, user=user, password=password)
        if not r:
            continue
        protocols[protocol] = r
        timeout += r['timeout']

    if len(protocols) == 0:
        return False

    r = protocols[random.choice(list(protocols.keys()))]['response']
    timeout = timeout // len(protocols)

    results = {
        'protocols': list(protocols.keys()),
        'timeout': timeout
    }

    if check_country:
        country = get_country(proxy.split(':')[0])
        results['country'] = country[0]
        results['country_code'] = country[1]

    if check_address:
        remote_regex = r'REMOTE_ADDR = (\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
        remote_addr = re.search(remote_regex, r)
        if remote_addr:
            remote_addr = remote_addr.group(1)
        results['remote_address'] = remote_addr

    return results


if __name__ == '__main__':

    all_proxies = open('proxies.txt', 'r', encoding='utf-8').read().split('\n')

    for proxy in all_proxies:
        cur_proxy = proxy.split(':')
        ip_port = cur_proxy[0] + ':' + cur_proxy[1]
        proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]

        start_time = time.time()
        res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)

        if res:
            protocol = str(res['protocols'][0])
            timeout = str(res['timeout'])
            country = str(res['country'])
            code = str(res['country_code'])
            print("Прошло %s секунд" % (round(time.time() - start_time)))
            print(f'{proxy}: timeout {timeout} ({protocol}) | {country} ({code})')

Output example:

Прошло 15 секунд
...: timeout 86 (http) | Russian Federation (RU)
Прошло 16 секунд
...: timeout 89 (http) | - (-)
Прошло 12 секунд
...: timeout 87 (http) | Russian Federation (RU)
Прошло 13 секунд
...: timeout 83 (http) | Russian Federation (RU)

Answer:

Since you put PyQt5 in the tags, we will use it. More specifically, QThread:

import sys
import time
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import QObject, QThread, pyqtSignal
# ваш код прокси-чекера из вопроса пусть лежит в proxyChecker.py
from proxyChecker import check_proxy


# этот объект будет крутиться в отдельном потоке
class Worker(QObject):

    # у объекта будут два сигнала - поток завершен и сигнал с результатом проверки очередного прокси
    finished = pyqtSignal()
    progress = pyqtSignal(str)
    running = False

    # это будем использовать если надо будет остановить проверку
    def stop(self):
        self.running = False

    # это основной цикл проверки прокси
    def run(self):
        self.running = True
        all_proxies = open('proxies.txt', 'r', encoding='utf-8').read().split('\n')
        for proxy in all_proxies:
            self.progress.emit(f'Проверяем: {proxy}')
            cur_proxy = proxy.split(':')
            ip_port = cur_proxy[0] + ':' + cur_proxy[1]
            proxy_log, proxy_pass = cur_proxy[2], cur_proxy[3]

            start_time = time.time()
            res = check_proxy(ip_port, check_country=True, user=proxy_log, password=proxy_pass)
            # отправляем в основной поток инфо р результате тестирования
            if res:
                protocol = str(res['protocols'][0])
                timeout = str(res['timeout'])
                country = str(res['country'])
                code = str(res['country_code'])
                result = f"Прошло {round(time.time() - start_time)} секунд\n"
                result += f'{proxy}: timeout {timeout} ({protocol}) | {country} ({code})'
                self.progress.emit(result)
            else:
                result = f"Прошло {round(time.time() - start_time)} секунд\n"
                result += f'Недоступен'
                self.progress.emit(result)
            if not self.running:
                break
        # говорим основному потоку что мы отработали
        self.finished.emit()


class Ui_Form(object):
    def setupUi(self, Form):
        Form.setObjectName("Form")
        Form.resize(453, 408)
        self.verticalLayout = QtWidgets.QVBoxLayout(Form)
        self.verticalLayout.setObjectName("verticalLayout")
        self.verticalLayout_2 = QtWidgets.QVBoxLayout()
        self.verticalLayout_2.setObjectName("verticalLayout_2")
        self.textBrowser = QtWidgets.QTextBrowser(Form)
        self.textBrowser.setObjectName("textBrowser")
        self.verticalLayout_2.addWidget(self.textBrowser)
        self.verticalLayout.addLayout(self.verticalLayout_2)
        self.horizontalLayout = QtWidgets.QHBoxLayout()
        self.horizontalLayout.setObjectName("horizontalLayout")
        spacerItem = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.horizontalLayout.addItem(spacerItem)
        self.pushButton = QtWidgets.QPushButton(Form)
        self.pushButton.setObjectName("pushButton")
        self.horizontalLayout.addWidget(self.pushButton)
        self.pushButtonCancel = QtWidgets.QPushButton(Form)
        self.pushButtonCancel.setObjectName("pushButton2")
        self.pushButtonCancel.setEnabled(False)
        self.horizontalLayout.addWidget(self.pushButtonCancel)
        spacerItem1 = QtWidgets.QSpacerItem(40, 20, QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Minimum)
        self.horizontalLayout.addItem(spacerItem1)
        self.verticalLayout.addLayout(self.horizontalLayout)
        self.retranslateUi(Form)
        QtCore.QMetaObject.connectSlotsByName(Form)

    def retranslateUi(self, Form):
        _translate = QtCore.QCoreApplication.translate
        Form.setWindowTitle(_translate("Form", "Example"))
        self.pushButton.setText(_translate("Form", "Start"))
        self.pushButtonCancel.setText(_translate("Form", "Stop"))



class MyWindow(QtWidgets.QWidget):
    def __init__(self, parent=None):
        super().__init__()
        self.ui = Ui_Form()
        self.ui.setupUi(self)
        # цепляем кнопочки к функциям запуска/остановки проверки
        self.ui.pushButton.clicked.connect(self.runLongTask)
        self.ui.pushButtonCancel.clicked.connect(self.breakLongTask)

    # функция будет отрабатывать когда поток проверки пришлет строку с результатом
    def reportProgress(self, n):
        self.ui.textBrowser.append(n)

    def breakLongTask(self):
        self.worker.stop()

    # при нажатии кнопы запуска надо создать поток проверки
    def runLongTask(self):
        # сделаем активной кнопку остановки проверки
        self.ui.pushButtonCancel.setEnabled(True)
        # создадим объект QThread
        self.thread = QThread()
        # создадим объект, который будет крутиться созданном в потоке
        self.worker = Worker()
        # запихиваем наш объект в новый поток
        self.worker.moveToThread(self.thread)
        # делаем обвязку сигналов:
        # Функция, вызываемая при запуске потока
        self.thread.started.connect(self.worker.run)
        # при получении от потока сигнала с результатом, обработаем его в reportProgress
        self.worker.progress.connect(self.reportProgress)
        # действия при завершении потока:
        # а) выйти из потока (завершить поток)
        self.worker.finished.connect(self.thread.quit)
        # б) запланировать удаление объекта, который крутился в потоке
        self.worker.finished.connect(self.worker.deleteLater)
        # в) запланировать удаление самого объекта QThread
        self.thread.finished.connect(self.thread.deleteLater)

        # Наш поток готов к запуску. Стартуем
        self.thread.start()

        # Пока поток работает, кнопка запуска будет недоступна
        self.ui.pushButton.setEnabled(False)

        # После завершения потока сделаем кнопку запуска активной а кнопку остановки неактивной
        self.thread.finished.connect(
            lambda: (
                self.ui.textBrowser.append("finished"),
                self.ui.pushButtonCancel.setEnabled(False),
                self.ui.pushButton.setEnabled(True)
                     )
        )


if __name__ == '__main__':
    app = QtWidgets.QApplication(sys.argv)
    window = MyWindow()
    window.show()
    sys.exit(app.exec())
Scroll to Top
AllEscort