| | from asyncio import Queue, create_task, gather, sleep |
| | from contextlib import asynccontextmanager |
| | from json import dumps, loads |
| | from logging import INFO, basicConfig, getLogger |
| | from pathlib import Path |
| | from typing import Literal |
| |
|
| | from apscheduler.schedulers.asyncio import AsyncIOScheduler |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.responses import JSONResponse, PlainTextResponse |
| | from proxybroker import Broker |
| | from uvicorn import run as uvicorn_run |
| |
|
| |
|
| | scheduler = AsyncIOScheduler() |
| |
|
| | try: |
| | workdir = Path(__file__).parent |
| | except: |
| | workdir = Path.cwd().parent |
| |
|
| | logfile = workdir / 'log.log' |
| |
|
| | basicConfig( |
| | level=INFO, |
| | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
| | filename=str(logfile), |
| | filemode='a' |
| | ) |
| | logger = getLogger('proxy_collector') |
| | logger.info('запуск...') |
| |
|
| |
|
| | def delete_logs(): |
| | try: |
| | logfile.unlink() |
| | except Exception as e: |
| | logger.error(f'ошибка при удалении логов: {e}') |
| |
|
| |
|
| | is_first_run = True |
| |
|
| | http_collected_json = workdir / 'http_proxies.json' |
| | https_collected_json = workdir / 'https_proxies.json' |
| | socks5_collected_json = workdir / 'socks5_proxies.json' |
| | collected_json = workdir / 'proxies.json' |
| |
|
| | countries_list = ['US', 'CA', 'FR', 'FI', 'HR', 'ME', 'CH', 'SE', 'EE', 'DE', 'GB', 'IT', 'NL', 'PL', 'CZ', 'RS', 'RO', 'MD', 'AT', 'BE', 'BG', 'HU', 'DK', 'IS', 'KZ', 'LV', 'LT', 'LU', 'NO', 'PT', 'SK', 'SI'] |
| |
|
| |
|
| | def create_json_from_proxies(proxy_lines: list[str], filename: Path): |
| | logger.info('сохранение файла прокси') |
| | countries = set() |
| | proxies = [] |
| |
|
| | for line in proxy_lines: |
| | parts = line.split() |
| | country = parts[1] |
| | ping = float(parts[2].strip('s')) |
| | protocol = parts[3].strip('[]') |
| | host = parts[4].rstrip('>') |
| |
|
| | if "HTTP:" in protocol: |
| | protocol = "HTTP" |
| | host = parts[5].rstrip(']>') |
| |
|
| | countries.add(country) |
| | proxies.append({"country": country, "ping": ping, "protocol": protocol, "host": host}) |
| |
|
| | data = { |
| | 'countries': sorted(list(countries)), |
| | 'proxies': proxies |
| | } |
| | filename.write_text(dumps(data, indent=4)) |
| | return filename |
| |
|
| |
|
| | async def collect_proxies(proxies_queue: Queue): |
| | proxies_list = [] |
| | while True: |
| | proxy = await proxies_queue.get() |
| | if proxy is None: |
| | break |
| | proxies_list.append(f'{proxy}') |
| | |
| | return proxies_list |
| |
|
| |
|
| | async def sort_proxies_and_merge(files: list[Path], output_file: Path): |
| | logger.info('объединение файлов прокси') |
| | all_countries = set() |
| | proxies_by_type = {} |
| | for file in files: |
| | if file.is_file() and file.stat().st_size > 0: |
| | data = loads(file.read_text(encoding='utf-8')) |
| | proxies = data.get('proxies') |
| | if proxies: |
| | first_proxy = proxies[0] if proxies else None |
| | proxy_type = first_proxy.get('protocol').lower() if first_proxy and first_proxy.get('protocol') else None |
| | if proxy_type: |
| | sorted_proxies = sorted(proxies, key=lambda x: x.get('ping')) |
| | proxies_by_type[proxy_type] = { |
| | 'countries': list(set(proxy.get('country') for proxy in sorted_proxies if proxy.get('country'))), |
| | 'proxies': sorted_proxies |
| | } |
| | all_countries.update(proxies_by_type[proxy_type]["countries"]) |
| | all_countries = sorted(all_countries) |
| | merged_data = {'countries': all_countries, 'proxies_by_type': proxies_by_type} |
| | output_file.write_text(dumps(merged_data, indent=4)) |
| | return output_file |
| |
|
| |
|
| | async def stop_broker_after_timeout(broker: Broker, timeout_minutes: int): |
| | await sleep(timeout_minutes * 60) |
| | try: |
| | broker.stop() |
| | except: |
| | pass |
| |
|
| |
|
| | async def find_proxies_by_type(proxy_type: Literal['HTTP', 'HTTPS', 'SOCKS5'], output_json_file: Path, timeout_minutes: int = 50): |
| | logger.info(f'начат сбор прокси {proxy_type}') |
| | output_json_file.write_text(dumps({'countries': None, 'proxies': []}, indent=4)) |
| | proxies_queue = Queue() |
| | broker = Broker(proxies_queue, timeout=8, max_conn=200, max_tries=3, verify_ssl=False) |
| | stop_task = create_task(stop_broker_after_timeout(broker, timeout_minutes)) |
| | await broker.find(types=[proxy_type], countries=countries_list, limit=0) |
| | await stop_task |
| | proxies_list = await collect_proxies(proxies_queue) |
| | saved_proxy = create_json_from_proxies(proxies_list, output_json_file) |
| | logger.info(f'завершён сбор прокси {proxy_type}') |
| | return saved_proxy |
| |
|
| |
|
| | async def find_proxies(): |
| | global is_first_run |
| | timeout_minutes = 10 if is_first_run else 50 |
| | logger.info(f'запущены задачи по сбору всех типов прокси') |
| | results = await gather( |
| | find_proxies_by_type('HTTP', http_collected_json, timeout_minutes), |
| | find_proxies_by_type('HTTPS', https_collected_json, timeout_minutes), |
| | find_proxies_by_type('SOCKS5', socks5_collected_json, timeout_minutes) |
| | ) |
| | await sort_proxies_and_merge(list(results), collected_json) |
| | is_first_run = False |
| | logger.info(f'задачи по сбору прокси завершены') |
| |
|
| |
|
| | scheduler.add_job(find_proxies, 'interval', max_instances=1, minutes=60) |
| | scheduler.add_job(delete_logs, 'interval', max_instances=1, minutes=1440) |
| |
|
| |
|
| | @asynccontextmanager |
| | async def app_lifespan(app: FastAPI): |
| | scheduler.start() |
| | task = create_task(find_proxies()) |
| | yield |
| | await task |
| | scheduler.shutdown() |
| |
|
| |
|
| | app = FastAPI(lifespan=app_lifespan) |
| |
|
| |
|
| | def not_redy_yet(): |
| | return JSONResponse({"error": "ёще не готово, сбор и проверка прокси занимает около часа"}, status_code=200) |
| |
|
| |
|
| | @app.post('*') |
| | async def read_root(): |
| | return HTTPException(405) |
| |
|
| |
|
| | @app.get('/all/') |
| | async def get_proxies(): |
| | if collected_json.exists(): |
| | return loads(collected_json.read_text()) |
| | else: |
| | return not_redy_yet() |
| |
|
| |
|
| | @app.get('/http/') |
| | async def get_http_proxies(): |
| | if http_collected_json.exists(): |
| | return loads(http_collected_json.read_text()) |
| | else: |
| | return not_redy_yet() |
| |
|
| |
|
| | @app.get('/https/') |
| | async def get_https_proxies(): |
| | if https_collected_json.exists(): |
| | return loads(https_collected_json.read_text()) |
| | else: |
| | return not_redy_yet() |
| |
|
| |
|
| | @app.get('/socks5/') |
| | async def get_socks5_proxies(): |
| | if socks5_collected_json.exists(): |
| | return loads(socks5_collected_json.read_text()) |
| | else: |
| | return not_redy_yet() |
| |
|
| |
|
| | @app.get('/log/') |
| | async def get_logs(): |
| | if logfile.exists(): |
| | return PlainTextResponse(logfile.read_text(encoding='utf-8'), status_code=200) |
| | else: |
| | return PlainTextResponse('лог пуст', status_code=200) |
| |
|
| |
|
| | @app.get('/') |
| | async def read_root(): |
| | return PlainTextResponse('ну пролапс, ну и что', status_code=200) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90) |
| |
|