أدوات التزامن المتقدّمة

أساسيات CompletableFuture

15 دقيقة الدرس 5 من 13

أساسيات CompletableFuture

قدّمت Java 8 كلاس CompletableFuture<T> كتوسعة للـ Future<T> القديم، لكنّه يمنحك ما افتقده الأصل تمامًا: القدرة على تسجيل استدعاءات راجعة (callbacks)، وتسلسل التحويلات، وبناء مسارات معالجة غير متزامنة كاملة — كل ذلك دون حجب خيط الانتظار للنتيجة.

في هذا الدرس نركّز على أسلوبين يشكّلان عمود فقرة كل مسار معالجة: supplyAsync لبدء عمل غير متزامن ينتج قيمة، وthenApply لتحويل تلك القيمة عند وصولها.

لماذا CompletableFuture وليس Future العادي؟

مع Future العادي، الطريقة الوحيدة للحصول على النتيجة هي استدعاء get()، الذي يحجب الخيط الحالي حتى تنتهي العملية الحسابية. هذا يجعل بناء المسارات مستحيلاً ويُلغي فائدة عدم التزامن.

// الطريقة القديمة بـ Future — يضطر الخيط للانتظار ExecutorService pool = Executors.newFixedThreadPool(4); Future<String> future = pool.submit(() -> fetchFromDatabase()); String result = future.get(); // الخيط يُحجب هنا — لا شيء آخر يعمل System.out.println(result);

يحلّ CompletableFuture هذه المشكلة بالسماح لك بإرفاق استدعاء راجع يُنفَّذ تلقائيًا بمجرد توفّر القيمة، محرّرًا الخيط لأداء أعمال أخرى في هذه الأثناء.

supplyAsync — بدء عملية حسابية غير متزامنة

يُرسل CompletableFuture.supplyAsync(Supplier<T>) lambda إلى مجموعة خيوط ويُعيد فورًا CompletableFuture<T>. يواصل الخيط الاستدعائي عمله؛ تعمل lambda في الخلفية.

import java.util.concurrent.CompletableFuture; CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { // محاكاة عملية I/O بطيئة Thread.sleep(500); // استثناء محتمل — لفّه أو أعِد رميه return "order-42"; }); // الخيط الاستدعائي لم يُحجب؛ cf مُعادة بالفعل System.out.println("تم إرسال الطلب، جارٍ تنفيذ أعمال أخرى...");
أي مجموعة خيوط تُستخدم؟ بشكل افتراضي يستخدم supplyAsync المجموعة المشتركة للـ JVM وهي ForkJoinPool.commonPool(). هذه المجموعة مشتركة بين التطبيق بأكمله، لذا في كود الإنتاج الذي يجري استدعاءات I/O (شبكة، قرص) عليك تمرير executor خاص بك كوسيط ثانٍ لتجنّب حرمان المهام المرتبطة بالمعالج.
// تمرير executor مخصص — دومًا الأفضل لعمليات I/O ExecutorService ioPool = Executors.newFixedThreadPool(8); CompletableFuture<String> cf = CompletableFuture.supplyAsync( () -> callExternalApi(), ioPool );

thenApply — تحويل النتيجة

يُسجّل thenApply(Function<T, U>) تحويلًا سيعمل على الخيط نفسه الذي أكمل الـ future (أو الخيط الاستدعائي إن كانت قد اكتملت بالفعل). ويُعيد CompletableFuture<U> جديدًا يحمل القيمة المُحوَّلة.

CompletableFuture<Integer> pipeline = CompletableFuture .supplyAsync(() -> " hello world ") // CF<String> .thenApply(String::trim) // CF<String> — "hello world" .thenApply(String::toUpperCase) // CF<String> — "HELLO WORLD" .thenApply(String::length); // CF<Integer> — 11 System.out.println(pipeline.get()); // 11

كل استدعاء لـ thenApply يُعيد CompletableFuture جديدًا. يبقى الأصلي دون تغيير. هذا الأسلوب القائم على مسار غير قابل للتعديل يجعل كل خطوة قابلة للتركيب والاختبار باستقلالية.

فكّر بالأنواع. المعامل العام يخبرك بالضبط ما تُنتجه كل مرحلة: CompletableFuture<String>thenApply(String::length)CompletableFuture<Integer>. إذا استنتج IDE الأنواع لك، تستطيع اكتشاف أخطاء المنطق (مثل تطبيق دالة String على Integer) في وقت الترجمة.

مثال كامل من البداية إلى النهاية

فيما يلي مسار معالجة صغير لكنه واقعي: جلب معرّف مستخدم من خدمة بطيئة، ثم البحث عن بريده الإلكتروني، ثم تنسيق رسالة ترحيب — كل ذلك دون حجب أي خيط باستثناء استدعاء get() الأخير في النهاية.

import java.util.concurrent.*; public class PipelineDemo { static String fetchUserId() { // محاكاة زمن استجابة الشبكة sleep(300); return "usr-7"; } static String fetchEmail(String userId) { sleep(200); return userId + "@example.com"; } static String formatGreeting(String email) { return "Welcome, " + email + "!"; } public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(4); CompletableFuture<String> greeting = CompletableFuture .supplyAsync(PipelineDemo::fetchUserId, pool) // "usr-7" .thenApply(PipelineDemo::fetchEmail) // "usr-7@example.com" .thenApply(PipelineDemo::formatGreeting); // "Welcome, usr-7@example.com!" // هذا السطر وحده يحجب — المسار بأكمله غير محجوب System.out.println(greeting.get()); pool.shutdown(); } static void sleep(long ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

thenApplyAsync — إسناد التحويل لخيط منفصل

ينفّذ thenApply استدعاءه على الخيط الذي أكمل المرحلة السابقة. إن كان التحويل نفسه مكلفًا، استخدم thenApplyAsync لإحالته إلى خيط من المجموعة بدلًا من ذلك.

CompletableFuture<byte[]> pipeline = CompletableFuture .supplyAsync(() -> downloadFile(url), ioPool) .thenApplyAsync(bytes -> compress(bytes), cpuPool); // عمل CPU ثقيل — استخدم مجموعة مخصصة
لا تحجب داخل thenApply. استدعاء Thread.sleep() أو استعلامات JDBC أو أي عملية حجب أخرى داخل استدعاء thenApply يُشغل خيط المجموعة طوال فترة الانتظار. استخدم thenCompose (يُغطّى في الدرس القادم) للخطوات التي تُعيد هي نفسها CompletableFuture، ومنح عمليات I/O دائمًا executor خاصًا بها.

استرداد النتائج: join مقابل get

يحجب كلا الأسلوبين get() وjoin() حتى تتوفّر النتيجة. الفرق في معالجة الاستثناءات: يرمي get() استثناءات محتملة (InterruptedException، ExecutionException)، بينما يلفّ join() كل شيء في CompletionException غير محتملة. داخل سلاسل lambda يكون join() أكثر عملية؛ في كود التطبيق حيث تريد معالجة الانقطاع صراحةً، فضّل get().

الخلاصة

يُرسل supplyAsync العمل إلى مجموعة خيوط ويُعيد مؤشرًا حيًا للنتيجة المستقبلية. يُسلسل thenApply تحويلًا نقيًا على ذلك المؤشر دون حجب. تسلسل عدة استدعاءات thenApply ينتج مسارًا مقروءًا وآمن الأنواع حيث كل خطوة دالة صغيرة ومحدّدة الهدف. مرّر دائمًا executor مسمى للمراحل المرتبطة بـ I/O، ولا تحجب أبدًا داخل الاستدعاء الراجع. الدرس القادم يوسّع هذه الأنماط بـ thenCompose وthenCombine ومعالجة الأخطاء.