سكريبت DevOps يتحقق من صحة 200 خدمة بشكل تسلسلي ينتظر كل استجابة HTTP قبل إطلاق الطلب التالي. بمتوسط زمن استجابة 100 ميلي ثانية، هذا يعني 20 ثانية من الوقت الفعلي — تُقضى تقريباً كلها في الانتظار على I/O الشبكة، بينما المعالج خامل. أضف التزامن وتنتهي نفس العملية في أقل من ثانية. يُعلّم هذا الدرس نموذجَي التزامن اللذين تقدمهما Python، ومتى يكون كل منهما الخيار الصحيح، والمزالق الإنتاجية التي تُسقط حتى المهندسين ذوي الخبرة.
النموذجان: Threading مقابل asyncio
قفل المترجم العالمي (GIL) في Python يعني أنه في أي لحظة يُنفّذ خيط واحد فقط بايت كود Python. للعمل المُكثَّف على المعالج (الحسابات الرقمية، الضغط) يجعل هذا الخيوط غير مفيدة للتوازي — استخدم multiprocessing أو عمليات فرعية بدلاً من ذلك. للعمل المُكثَّف على I/O — الغالبية العظمى من مهام العمليات (استدعاءات HTTP، استدعاءات SDK، بحث DNS، انتظار العمليات الفرعية) — يُحرَّر GIL بينما الخيط محجوب على I/O، لذا تعمل خيوط متعددة بشكل متزامن فعلياً.
Threading (concurrent.futures.ThreadPoolExecutor): كل وحدة عمل تعمل في خيط OS خاص بها. بسيط التبني لأن الكود المتزامن الموجود (استدعاءات boto3، requests.get) يعمل بدون أي تغييرات. الأفضل لعمل fan-out حيث تستدعي عمليات I/O مستقلة كثيرة وتجمع النتائج.
asyncio: حلقة أحداث أحادية الخيط تتنقل بين الكوروتينات كلما انتظرت إحداها على I/O. حمل أقل لكل مهمة (لا حزمة خيط OS)، لكن يتطلب مكتبات async-native (aiohttp، aiobotocore). الأفضل عندما تحتاج آلاف الاتصالات المتزامنة أو تحتاج إلغاء وانتهاء مهلة دقيقَين.
القاعدة الأساسية في الشركات الكبرى: لمعظم سكريبتات العمليات التي تتوسع عبر عشرات إلى بضع مئات من الخدمات أو موارد AWS، ThreadPoolExecutor مع حوض محدود (8–32 عاملاً) هو الافتراضي الصحيح. مقروء وقابل للتصحيح ولا يتطلب مكتبات تدعم async. الجأ إلى asyncio عندما تحتاج إلى آلاف الاتصالات المفتوحة المتزامنة — مولّد حِمل، متتبع سجل في الوقت الحقيقي، أو عميل مراقبة قائم على WebSocket.
استدعاءات API المتوازية مع ThreadPoolExecutor
النمط المعتمد: أرسل جميع المهام، اجمع الـ futures، وكرّر على الاكتمالات. دالة as_completed تُعطي الـ futures حين تنتهي بدلاً من ترتيب الإرسال، مما يتيح لك معالجة النتائج لحظة وصولها.
import concurrent.futures
import requests
import logging
import time
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
ENDPOINTS = [
"https://api.example.com/health",
"https://api.example.com/metrics",
"https://auth.example.com/health",
"https://payments.example.com/health",
"https://notifications.example.com/health",
]
def check_health(url: str, timeout: int = 5) -> dict:
"""إرجاع قاموس نتيجة الصحة؛ لا ترمي استثناءات أبداً — يعتمد عليها المستدعون."""
start = time.monotonic()
try:
resp = requests.get(url, timeout=timeout)
elapsed = time.monotonic() - start
return {
"url": url,
"status": resp.status_code,
"ok": resp.status_code == 200,
"latency_ms": round(elapsed * 1000),
}
except requests.exceptions.Timeout:
return {"url": url, "ok": False, "error": "timeout"}
except requests.exceptions.RequestException as exc:
return {"url": url, "ok": False, "error": str(exc)}
def fan_out_health_check(urls: list[str], workers: int = 10) -> list[dict]:
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
# أرسل جميع المهام فوراً — يُرجع كائنات Future
future_to_url = {pool.submit(check_health, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
status = "OK" if result.get("ok") else "FAIL"
log.info("%s %s (%s ms)", status, url, result.get("latency_ms", "-"))
except Exception as exc:
log.error("Unexpected error checking %s: %s", url, exc)
results.append({"url": url, "ok": False, "error": str(exc)})
return results
if __name__ == "__main__":
results = fan_out_health_check(ENDPOINTS)
failed = [r for r in results if not r["ok"]]
if failed:
log.error("%d endpoint(s) unhealthy: %s", len(failed), failed)
raise SystemExit(1)
log.info("All %d endpoints healthy", len(results))
ممارسة احترافية: امسك الاستثناءات دائماً داخل دالة العامل وأرجع قاموس خطأ منظَّم بدلاً من السماح للاستثناءات بالانتشار عبر الـ future. هكذا لا تتعطل حلقة fan-out في المنتصف حين يكون أحد الأهداف الخمسين غير قابل للوصول. احتفظ بالتقاط استثناء future.result() كشبكة أمان للأخطاء غير المتوقعة حقاً، لا لأخطاء الشبكة المتوقعة.
اختيار حجم الحوض المناسب
تحديد max_workers قرار ضبط دقيق، لا مقياساً "كلما أكبر كان أفضل". صغر جداً وتترك التوازي على الطاولة. كبر جداً وتستنزف واصفات الملفات، أو تصطدم بحدود اتصال TCP لكل مضيف، أو يُقيّدك API البعيد بمعدل طلبات.
لـ APIs HTTP الخارجية: معظم APIs SaaS تفرض حدوداً لكل عميل (مثلاً 100 طلب/ثانية). ابدأ بـ 10–20 عاملاً وراقب معدل 429. تراجع بشكل أسي عند 429s بدلاً من إضافة خيوط.
لاستدعاءات AWS SDK (boto3): حوض الاتصال الافتراضي لكل Session هو 10. مع 50 خيطاً يتشاركون جلسة واحدة ستحصل على ConnectionError بسبب استنزاف الحوض. إما اضبط max_pool_connections في botocore.config.Config ليتطابق مع عدد العمال، أو أنشئ جلسة واحدة لكل خيط باستخدام threading.local().
للخدمات الداخلية / فحوصات الصحة: يمكن أن يكون العمال 50–100 إذا كانت الأهداف على شبكة داخلية منخفضة الزمن وتملك الخدمات المستهدفة.
توسع ThreadPoolExecutor: تُرسَل جميع المهام فوراً؛ as_completed تُعطي النتائج مع انتهاء كل عامل، لا بترتيب الإرسال.
asyncio للعمل عالي التوسع
حين يصل عدد أهداف التوسع إلى المئات أو الآلاف، يصبح الحمل الناتج عن خيوط OS ذا أهمية. كل خيط يستهلك افتراضياً ~8 ميغابايت من المكدس؛ 1000 خيط = 8 غيغابايت RAM قبل كتابة بايت واحد من بيانات التطبيق. كوروتينات asyncio رخيصة — آلاف منها تتعايش في بضع ميغابايتات. المقايضة: كل مكتبة في سلسلة الاستدعاء يجب أن تكون async-native.
import asyncio
import aiohttp
import logging
log = logging.getLogger(__name__)
ENDPOINTS = [f"https://internal-svc-{i}.prod.example.com/health" for i in range(1, 201)]
async def check_one(session: aiohttp.ClientSession, url: str) -> dict:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
return {"url": url, "ok": resp.status == 200, "status": resp.status}
except asyncio.TimeoutError:
return {"url": url, "ok": False, "error": "timeout"}
except aiohttp.ClientError as exc:
return {"url": url, "ok": False, "error": str(exc)}
async def fan_out(urls: list[str], concurrency: int = 50) -> list[dict]:
# Semaphore يُحدّ الاتصالات المفتوحة بغض النظر عن عدد المهام
sem = asyncio.Semaphore(concurrency)
async def guarded(url: str) -> dict:
async with sem:
return await check_one(session, url)
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [asyncio.create_task(guarded(url)) for url in urls]
return await asyncio.gather(*tasks)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
results = asyncio.run(fan_out(ENDPOINTS))
failed = [r for r in results if not r["ok"]]
log.info("Checked %d endpoints. Failures: %d", len(results), len(failed))
if failed:
for f in failed:
log.error("FAIL %s: %s", f["url"], f.get("error", f.get("status")))
raise SystemExit(1)
مصيدة إنتاجية — gather غير محدود:asyncio.gather(*[task for task in 10_000_tasks]) ينشئ 10,000 كوروتين في وقت واحد. حتى لو كانت الكوروتينات خفيفة، ترى الخدمات البعيدة قطيعاً متزاحماً من الاتصالات. ثبّت دائماً بـ asyncio.Semaphore واضبط limit مطابقاً على TCPConnector. بدون هذا، ستضرب EMFILE على مستوى النواة (ملفات مفتوحة كثيرة جداً) أو يحظرك الخدمة البعيدة.
مزج boto3 مع حوض الخيوط
boto3 ليست async-native، لذا النمط الصحيح لاستدعاءات AWS API المتوازية هو ThreadPoolExecutor. تفصيل حرج: كائنات boto3 Sessionليست آمنة للخيوط. جلسة واحدة مع عميل محلي للخيط هو الحل المعتمد داخل أدوات AWS نفسها.
import boto3
import concurrent.futures
import threading
import logging
log = logging.getLogger(__name__)
# تخزين محلي للخيط: كل خيط يحصل على عميل EC2 خاص به
_local = threading.local()
def get_ec2_client(region: str) -> boto3.client:
if not hasattr(_local, "clients"):
_local.clients = {}
if region not in _local.clients:
session = boto3.Session()
_local.clients[region] = session.client(
"ec2",
region_name=region,
config=boto3.session.Config(max_pool_connections=1),
)
return _local.clients[region]
def get_region_instance_count(region: str) -> dict:
try:
ec2 = get_ec2_client(region)
paginator = ec2.get_paginator("describe_instances")
total = sum(
len(r["Instances"])
for page in paginator.paginate(Filters=[{"Name": "instance-state-name", "Values": ["running"]}])
for r in page["Reservations"]
)
return {"region": region, "running": total}
except Exception as exc:
log.error("Failed to query %s: %s", region, exc)
return {"region": region, "error": str(exc)}
REGIONS = ["us-east-1", "us-west-2", "eu-west-1", "ap-southeast-1", "ap-northeast-1"]
with concurrent.futures.ThreadPoolExecutor(max_workers=len(REGIONS)) as pool:
results = list(pool.map(get_region_instance_count, REGIONS))
for r in results:
log.info("Region %s: %s running instances", r["region"], r.get("running", "ERROR"))
انتهاء المهلة والإلغاء
أي سكريبت عمليات متزامن يجب أن يملك موعداً نهائياً للوقت الكلي. هدف واحد بطيء أو معلّق يجب ألا يُعيق التشغيل بأكمله إلى الأبد. الأنماط تختلف بين النموذجين:
ThreadPoolExecutor: استخدم concurrent.futures.wait(fs, timeout=30) للحصول على زوج (done, not_done) بعد 30 ثانية. استدعِ future.cancel() على مجموعة not_done — لاحظ أن الإلغاء يعمل فقط للمهام التي لم تبدأ التنفيذ بعد. للمهام التي تعمل بالفعل، النهج الآمن الوحيد هو التعاوني: مرّر threading Event للعامل وتحقق منه دورياً.
asyncio: لفّ أي كوروتين بـ asyncio.wait_for(coro, timeout=5.0). يرمي asyncio.TimeoutError ويلغي المهمة الأساسية بنظافة. للدفعة: asyncio.gather(*tasks, return_exceptions=True) يضمن جمع جميع النتائج (بما فيها الاستثناءات) دون أن يُجهض فشل واحد الباقي.
ممارسة احترافية: في سكريبتات التنبيه الإنتاجية، اضبط موعداً نهائياً على مستوى السكريبت بالكامل (signal.alarm على Linux أو asyncio.wait_for حول استدعاء fan_out بأكمله) بالإضافة إلى مهلات كل طلب على حدة. هذا يمنع تشغيل السكريبت إلى الأبد إذا كانت آلية التزامن نفسها تحتوي خطأ — سيناريو أكثر شيوعاً مما تتوقع خلال أقسام الشبكة.
متى تتجنب التزامن
التزامن يُضيف سطح تصحيح أخطاء. سكريبت تسلسلي بتسجيل واضح أسهل في التشغيل من سكريبت متزامن. قبل اللجوء إلى الخيوط أو asyncio، اسأل:
هل وقت التشغيل التسلسلي الإجمالي مشكلة فعلاً؟ إذا انتهت دفعة ليلية في 4 دقائق، إضافة التزامن من أجله يُدخل مخاطر بدون فائدة مرئية للمستخدم.
هل للخدمة المستهدفة حدود معدل طلبات تجعل التزامن مضاداً للإنتاجية؟ ضرب حد 10 طلبات/ثانية بـ 50 خيطاً سيُنتج فيضاناً من 429s وإعادة محاولات.
هل العملية ذات حالة بطريقة تجعل الطفرات المتزامنة خطرة؟ الكتابة المتزامنة على نفس مفتاح S3، نفس صف قاعدة البيانات، أو نفس مورد Kubernetes يتطلب قفلاً موزعاً — مشكلة أصعب بكثير مما بدأت به.
الافتراضي الإنتاجي: في شركات كـ Google وStripe، تستخدم أدوات العمليات عادةً أحواض خيوط بعدد عمال متواضع (8–16) لتوسع عبر الخدمات، و asyncio فقط في مكونات pipeline عالية الإنتاجية (معالجو سجلات البث، محركات تقييم التنبيه). الاختيار يُحرَّك بمتطلبات زمن الاستجابة القابلة للقياس، لا بالجماليات. قِس قبل أن تُحسّن.
نستخدم ملفات تعريف الارتباط لتشغيل هذا الموقع وتحليل الزيارات وعرض إعلانات مخصّصة. يمكنك قبول كل ملفات تعريف الارتباط أو رفض غير الأساسية منها.
سياسة الخصوصية