أدوات التزامن المتقدّمة

أدوات المزامنة

15 دقيقة الدرس 9 من 13

أدوات المزامنة

تتيح لك مجمّعات الخيوط والـ futures تشغيل العمل بالتزامن، لكن الخيوط أحيانًا تحتاج إلى التنسيق فيما بينها — ينتظر خيط واحد حتى تنتهي خيوط أخرى، أو لا يجب أن يُثقَل مورد محدود بالطلبات، أو يجب أن تصل مجموعة من الخيوط إلى نقطة مشتركة قبل أن يستمر أي منها. توفّر حزمة java.util.concurrent ثلاث أدوات مزامنة متخصصة لهذه السيناريوهات: CountDownLatch وSemaphore وCyclicBarrier. تحل كل أداة منها مشكلة تنسيق مختلفة، واختيار الأداة الصحيحة يجعل الكود المتزامن أبسط وأأمن بكثير من محاولة تحقيق الأثر ذاته باستخدام wait/notify الخام أو الأعلام اليدوية.

CountDownLatch — انتظر حدوث N أحداث

يُهيَّأ CountDownLatch بعدّاد. يمكن لأي خيط استدعاء await() للحجب حتى يصل العداد إلى الصفر. تستدعي الخيوط الأخرى countDown() لتناقص العداد. حين يصل العداد إلى الصفر يُفتح المزلاج بشكل دائم — تُحرَّر جميع الخيوط المنتظرة وتعود استدعاءات await() اللاحقة فورًا.

import java.util.concurrent.*; public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { int workers = 5; CountDownLatch ready = new CountDownLatch(workers); // ينتظر المنسّق جاهزية العمّال CountDownLatch start = new CountDownLatch(1); // ينتظر العمّال إشارة البدء CountDownLatch done = new CountDownLatch(workers); // ينتظر المنسّق إتمام العمل ExecutorService pool = Executors.newFixedThreadPool(workers); for (int i = 0; i < workers; i++) { final int id = i + 1; pool.submit(() -> { System.out.printf("Worker %d: ready%n", id); ready.countDown(); // إشارة: أنا جاهز try { start.await(); // انتظر الإشارة System.out.printf("Worker %d: working...%n", id); Thread.sleep(200 + (long)(Math.random() * 300)); System.out.printf("Worker %d: done%n", id); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { done.countDown(); // إشارة: انتهيت } }); } ready.await(); // انتظر حتى يكون جميع العمّال جاهزين System.out.println("All workers ready — GO!"); start.countDown(); // أطلق إشارة البدء done.await(); // انتظر انتهاء الجميع pool.shutdown(); System.out.println("All workers finished."); } }

يستخدم هذا النمط الكلاسيكي "بداية السباق" مزلاجَين: الأول ليُبقي المنسّق منتظرًا حتى يكون كل عامل في موقعه، والثاني لاكتشاف انتهاء جميع الأعمال. لاحظ أن مزلاج start يُهيَّأ بـ 1 — استدعاء countDown() واحد يُحرّر جميع العمّال المنتظرين في آنٍ واحد.

المزاليج للاستخدام الفردي. حين يصل العداد إلى الصفر لا يمكن إعادة تهيئة CountDownLatch. إذا احتجت إلى إعادة استخدام الحاجز استخدم CyclicBarrier بدلًا منه (مُغطّى أدناه). لبوابات بدء التشغيل الأحادية وفحوصات جاهزية الخدمة ومزامنة الاختبارات يُعدّ المزلاج مثاليًا.

حالة استخدام أبسط وشائعة بنفس القدر: انتظار اكتمال N استدعاء خدمة متوازٍ قبل المتابعة.

CountDownLatch latch = new CountDownLatch(3); ExecutorService pool = Executors.newCachedThreadPool(); for (String service : List.of("auth", "inventory", "pricing")) { pool.submit(() -> { try { callService(service); // استدعاء HTTP/RPC حاجز } finally { latch.countDown(); // أنقص دائمًا، حتى عند الفشل } }); } latch.await(10, TimeUnit.SECONDS); // انتظر محدود — لا تحجب إلى الأبد pool.shutdown();
ضع countDown() دائمًا داخل كتلة finally. إذا رمى العامل استثناءً وفُوِّت countDown()، سينتظر أي خيط محجوب في await() إلى الأبد. ضمان finally هو النمط الآمن الوحيد.

Semaphore — قيّد الوصول المتزامن إلى مورد

يتحكم Semaphore في الوصول إلى مورد محدود عبر الاحتفاظ بمجموعة من التصاريح. يستدعي الخيط acquire() للحصول على تصريح (يحجب إذا لم يكن متاحًا) ويستدعي release() عند انتهائه. على خلاف mutex (المقفول أو المفتوح)، يمكن للـ semaphore الاحتفاظ بأي عدد من التصاريح — بذلك يمكنك السماح لثلاثة خيوط فقط بالوصول إلى مجمّع اتصالات قاعدة البيانات في وقت واحد مثلًا.

import java.util.concurrent.*; public class ConnectionPoolDemo { private static final Semaphore POOL = new Semaphore(3); // 3 اتصالات متزامنة على الأكثر static void useConnection(int threadId) throws InterruptedException { POOL.acquire(); // يحجب إذا أُخذت جميع التصاريح الثلاثة try { System.out.printf("Thread %d acquired connection (available: %d)%n", threadId, POOL.availablePermits()); Thread.sleep(500); // محاكاة عمل قاعدة البيانات } finally { POOL.release(); // أطلق دائمًا في finally System.out.printf("Thread %d released connection%n", threadId); } } public static void main(String[] args) throws InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(8); for (int i = 1; i <= 8; i++) { final int id = i; pool.submit(() -> { try { useConnection(id); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }); } pool.shutdown(); pool.awaitTermination(15, TimeUnit.SECONDS); } }

شغّل هذا وسترى ما يصل إلى ثلاث رسائل "acquired" قبل أول رسالة "released" — يُطبّق الـ semaphore الحد الأقصى في وقت التشغيل.

العدالة: يستخدم Semaphore سياسة غير عادلة افتراضيًا — لا تُخدَم الخيوط المنتظرة للحصول على تصريح وفق ترتيب وصولها. مرّر true للبنّاء للحصول على semaphore عادل (FIFO). تملك الـ semaphores العادلة إنتاجية أقل لكنها تمنع التجويع، وهذا مهم للموارد المتنازع عليها بكثافة لفترات طويلة.

Semaphore fair = new Semaphore(3, true); // ترتيب FIFO عادل
استخدم semaphore لتقييد الاستدعاءات الخارجية. تحديد معدل طلبات API الصادرة (مثل خدمة خارجية تسمح بـ 10 اتصالات متزامنة فقط) هو أحد أوضح حالات الاستخدام الحقيقية. ادمجه مع المتغير ذي المهلة — tryAcquire(timeout, unit) — لكي يفشل المستدعون سريعًا إذا كان المورد محمّلًا بدلًا من التكدّس إلى أجل غير مسمّى.

يتصرف الـ semaphore بتصريح واحد مثل mutex — لكنه ليس قابلًا لإعادة الدخول. على خلاف synchronized أو ReentrantLock، يمكن لخيط مختلف استدعاء release() على semaphore اكتسبه خيط آخر. هذا يجعل الـ semaphores مفيدة للإشارة بين المنتج والمستهلك، لا للإقصاء المتبادل فقط.

CyclicBarrier — زامِن مجموعة خيوط عند نقطة مشتركة

يُهيَّأ CyclicBarrier بعدد المشاركين. يستدعي كل خيط مشارك await() عند وصوله للحاجز. آخر خيط يصل يُشغّل إجراء الحاجز الاختياري (وهو Runnable يُشغَّل في ذلك الخيط)، ثم تُحرَّر جميع الخيوط. يُعيد الحاجز تهيئة نفسه تلقائيًا للدورة التالية — من هنا جاءت "الدورية".

import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class MatrixRowProcessor { static final int THREADS = 4; static final int ROUNDS = 3; public static void main(String[] args) throws InterruptedException { AtomicInteger globalSum = new AtomicInteger(0); // إجراء الحاجز: يُشغَّل مرة واحدة بعد أن تصل جميع الخيوط للحاجز في كل جولة Runnable barrierAction = () -> System.out.printf("=== Round complete. Running total: %d ===%n", globalSum.get()); CyclicBarrier barrier = new CyclicBarrier(THREADS, barrierAction); ExecutorService pool = Executors.newFixedThreadPool(THREADS); for (int t = 0; t < THREADS; t++) { final int threadId = t; pool.submit(() -> { try { for (int round = 1; round <= ROUNDS; round++) { // محاكاة عمل الخيط (احسب مجموعًا جزئيًا) int partial = (threadId + 1) * round; globalSum.addAndGet(partial); System.out.printf("Thread %d finished round %d (partial=%d)%n", threadId, round, partial); barrier.await(); // انتظر انتهاء جميع الخيوط من هذه الجولة } } catch (InterruptedException | BrokenBarrierException e) { Thread.currentThread().interrupt(); } }); } pool.shutdown(); pool.awaitTermination(30, TimeUnit.SECONDS); } }

الفرق الجوهري عن المزلاج: يُعيد الحاجز تهيئة نفسه بعد كل جولة. بعد أن تستدعي الخيوط الأربعة await() في الجولة الأولى، يُعاد تهيئة الحاجز وتستطيع الخيوط ذاتها استدعاء await() مجددًا في الجولة الثانية دون إنشاء كائن جديد.

ما هو BrokenBarrierException؟ إذا انقطع خيط أو انتهت مهلته أثناء الانتظار عند الحاجز، يدخل الحاجز حالة مكسورة وتتلقى جميع الخيوط المنتظرة حاليًا أو لاحقًا استثناء BrokenBarrierException. يمنع هذا حالة الإغلاق الشائعة حيث يموت خيط واحد ويبقى الباقون ينتظرون إلى الأبد. عالج دائمًا كلًّا من InterruptedException وBrokenBarrierException عند استدعاء barrier.await().

الاستخدام الحقيقي النموذجي هو تدفقات معالجة البيانات المتوازية: تُقسَّم مصفوفة أو مجموعة بيانات كبيرة إلى N قطعة، يعالج كل عامل قطعته في المرحلة الأولى، تتزامن جميع العمّال، ثم يعالج كل عامل قطعته في المرحلة الثانية باستخدام نتائج المرحلة الأولى، وهكذا. إعادة التهيئة الدورية تجعل هذا نمط التدفق موجزًا.

اختيار أداة المزامنة الصحيحة

  • CountDownLatch — انتظار أحادي: ينتظر منسّق حدوث N أحداث. فكّر في بوابات بدء تشغيل الخدمات وفحوصات الجاهزية وأنماط join-all التي لا تحتاج إلى إعادة استخدام.
  • Semaphore — تقييد الموارد: احدد عدد الخيوط التي يمكنها الاحتفاظ بمورد في آن واحد. فكّر في مجمّعات الاتصال ومُحددات المعدل وحراسات الوصول.
  • CyclicBarrier — التنسيق متعدد المراحل: يجب أن تصل N خيوط معًا إلى كل حد مرحلة قبل أن يستمر أي منها. فكّر في الخوارزميات المتوازية ذات نقاط المزامنة بين المراحل وخطوات المحاكاة وجولات معالجة البيانات الجماعية.
فضّل أعلى مستوى تجريد. إذا وجدت نفسك تستخدم AtomicInteger خامًا وحلقة while لانتظار شرط ما، تراجع للخلف — إحدى هذه الأدوات الثلاث، أو Phaser لسيناريوهات متعددة المراحل أكثر تعقيدًا، ستعطيك السلوك ذاته بكود أقل بكثير وأخطاء أقل بكثير.

الخلاصة

CountDownLatch هو بوابة أحادية الاستخدام: تنتظر الخيوط حتى يصل عدّاد الأحداث إلى الصفر. يُطبّق Semaphore حدودًا على التزامن في مورد مشترك باستخدام تصاريح acquire/release. يجعل CyclicBarrier مجموعة من الخيوط تلتقي عند نقاط تفتيش متكررة، مع تشغيل إجراء حاجز اختياري قبل تحريرها. الثلاثة جميعها أدوات مُختبَرة من java.util.concurrent تحل محل المزامنة اليدوية الهشة والمعرّضة للأخطاء. أطلق التصاريح وأنقص المزاليج دائمًا في كتل finally، وعالج BrokenBarrierException عند كل حاجز، واستخدم المتغيرات ذات المهلة لتجنّب الحجب غير المحدود في كود الإنتاج.