المرونة والمراسلة والرصد

أساسيات البث مع Kafka

18 دقيقة الدرس 7 من 12

أساسيات البث مع Kafka

تناولت الدروس السابقة في هذا البرنامج التعليمي أنماط المرونة — قواطع الدارة وإعادة المحاولة والحواجز — التي تحمي الخدمة من فشل التبعيات. افترضت تلك الأنماط نموذج الطلب/الاستجابة المتزامن. يُدخل Apache Kafka نموذجًا مختلفًا جذريًا: البث الحدثي (event streaming). بدلًا من انتظار المُستدعي للردّ، يُلحق المُنتِج حدثًا ثابتًا بسجل دائم، ويقرأ مستهلك واحد أو أكثر من ذلك السجل بشكل مستقل وبالسرعة التي يناسبه.

يُقدّم هذا الدرس التجريدات الأساسية لـ Kafka ويوضح كيفية إنتاج الرسائل واستهلاكها من تطبيق Spring Boot 3 باستخدام Spring Cloud Stream — التجريد على مستوى الإطار الذي يتيح لك تبديل وسطاء الرسائل دون إعادة كتابة منطق الأعمال.

لماذا Kafka؟

تحذف قوائم الانتظار التقليدية (RabbitMQ وActiveMQ) الرسالة فور استهلاكها بنجاح. أما Kafka فمختلف: إذ يُمثّل سجلًا موزعًا مُكرَّرًا (distributed replicated commit log). تُحتفظ بالرسائل لفترة قابلة للضبط (ساعات أو أيام أو أسابيع) بغضّ النظر عن قراءتها. يمنحك هذا خصائص يصعب تحقيقها بقوائم الانتظار:

  • إعادة التشغيل: يمكن لمستهلك جديد البدء من أول السجل وإعادة بناء الحالة.
  • التوزيع على نطاق واسع: يمكن لآلاف مجموعات المستهلكين المستقلة قراءة الموضوع ذاته دون أي تنسيق.
  • دلالات التنفيذ مرة واحدة بالضبط (بإعداد دقيق): تتيح المعاملات والمنتجون المتساوون ضمان معالجة الحدث مرة واحدة تمامًا حتى في حالات الفشل.
  • ضمانات الترتيب: الرسائل داخل قسم واحد مُرتَّبة ترتيبًا صارمًا.
Kafka ليس بديلًا عن قوائم الانتظار في جميع الحالات. إذا كنت تحتاج إلى توجيه معقد أو TTL لكل رسالة أو قوائم انتظار ذات أولوية، فقد لا يزال RabbitMQ هو الأداة المناسبة. يتفوق Kafka على الأحداث الدائمة القابلة لإعادة التشغيل عالية الإنتاجية — سجلات التدقيق وناقلات أحداث النطاق والمسارات اللحظية.

المفاهيم الأساسية لـ Kafka

قبل كتابة أي كود تأكد من وضوح المصطلحات:

  • الموضوع (Topic): سجل مُسمَّى يُلحَق به فقط. فكّر فيه كفئة لأحداثك (مثل order-placed وpayment-processed).
  • القسم (Partition): يُقسَّم الموضوع إلى قسم واحد أو أكثر لتحقيق التوازي. كل قسم سجل مستقل مُرتَّب.
  • الإزاحة (Offset): الموضع التسلسلي للرسالة داخل القسم. لا يُزيل Kafka الرسائل افتراضيًا؛ يتتبع المستهلكون إزاحتهم الخاصة.
  • المُنتِج (Producer): يكتب السجلات في موضوع. يختار القسم الذي يكتب فيه (تناوب أو تجزئة المفتاح أو مخصص).
  • مجموعة المستهلكين (Consumer group): مستهلك واحد أو أكثر يتشاركون معرف مجموعة. يُوزّع Kafka الأقسام على أعضاء المجموعة بحيث يستهلك كل قسم عضو واحد في آنٍ واحد. يمكن لمجموعات متعددة قراءة الموضوع ذاته بشكل مستقل.
  • الوسيط (Broker): خادم Kafka. تحتوي المجموعة على وسطاء متعددين للتكرار وتوزيع الحمل.

نظرة عامة على Spring Cloud Stream

يُغلّف Spring Cloud Stream (SCS) عميل Kafka خلف تجريد رابط (binder). يعمل كود تطبيقك مع حبوب داليةSupplier وFunction وConsumer من java.util.function. يُعيّن SCS هذه إلى مواضيع Kafka عبر الإعداد مما يتيح تبديل الرابط (إلى RabbitMQ مثلًا) دون لمس منطق الأعمال.

أضف رابط Kafka إلى pom.xml:

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>

يُوازن BOM الخاص بـ Spring Cloud (المُدار عبر الأصل أو dependencyManagement) الإصدار مع إصدار Spring Boot. لـ Spring Boot 3.x استخدم Spring Cloud 2023.x.

تعريف منتج باستخدام Supplier

تُستطلَع حبة Supplier<T> من قِبل الإطار وفق جدول زمني وتُرسَل قيمتها المُعادة كرسالة. هذا مفيد للمصادر القائمة على الاستطلاع (القراءة من قاعدة بيانات أو توليد أحداث نبضات قلب). للإنتاج المدفوع بالأحداث استخدم StreamBridge بدلًا من ذلك (موضح بعد مثال المستهلك).

import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.time.Instant; import java.util.function.Supplier; @Component public class OrderEventSource { // تُستدعى كل ثانية افتراضيًا؛ تُرسَل القيمة المُعادة إلى موضوع الإخراج المرتبط @Bean public Supplier<OrderPlacedEvent> orderSupplier() { return () -> new OrderPlacedEvent("ORD-" + System.nanoTime(), Instant.now()); } }

اربطها بموضوع Kafka في application.yml:

spring: cloud: stream: bindings: orderSupplier-out-0: # <اسمالحبة>-out-<فهرس> destination: order-placed # اسم موضوع Kafka contentType: application/json kafka: binder: brokers: localhost:9092
اصطلاح التسمية: يبني SCS أسماء الارتباط تلقائيًا كـ <اسمالدالة>-in-0 (مدخل) و<اسمالدالة>-out-0 (مخرج). الرقم 0 هو الفهرس للدوال متعددة المدخلات/المخرجات. تحقق دائمًا من اسم الارتباط أو حدده صراحةً عبر spring.cloud.function.definition.

تعريف مستهلك باستخدام java.util.function.Consumer

تستقبل حبة Consumer<T> الرسائل من موضوع مدخلها المرتبط:

import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.function.Consumer; @Component public class OrderEventHandler { @Bean public Consumer<OrderPlacedEvent> processOrder() { return event -> { // منطق الأعمال هنا — مثلًا: تشغيل فحص المخزون System.out.println("Processing order: " + event.orderId()); }; } }
spring: cloud: stream: bindings: processOrder-in-0: destination: order-placed group: inventory-service # معرف مجموعة المستهلكين contentType: application/json

تعيين group أمر بالغ الأهمية في الإنتاج. بدونه يُنشئ SCS مجموعة مجهولة عند كل إعادة تشغيل فتبدأ خدمتك دائمًا بالقراءة من أحدث إزاحة — تُفقد الأحداث الغائبة خلال فترة التوقف بشكل دائم.

الإنتاج الإلزامي باستخدام StreamBridge

عندما تحتاج إلى إرسال رسالة استجابةً لطلب HTTP أو محفز آخر (وليس وفق جدول زمني)، أدرج StreamBridge:

import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Service; @Service public class OrderService { private final StreamBridge streamBridge; public OrderService(StreamBridge streamBridge) { this.streamBridge = streamBridge; } public void placeOrder(Order order) { // ... حفظ في قاعدة البيانات ... OrderPlacedEvent event = new OrderPlacedEvent(order.getId(), order.getCreatedAt()); streamBridge.send("order-placed", event); // اسم الموضوع أو اسم الارتباط } }

مفاتيح الرسائل والتقسيم

يضمن Kafka الترتيب فقط داخل قسم واحد. إذا كنت تحتاج إلى معالجة جميع أحداث الطلب ذاته بالترتيب، يجب أن تصل جميعها إلى القسم ذاته. يتحقق ذلك بتعيين مفتاح رسالة. يكشف SCS عن هذا عبر رأس KafkaHeaders.MESSAGE_KEY:

import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; Message<OrderPlacedEvent> message = MessageBuilder .withPayload(event) .setHeader(KafkaHeaders.MESSAGE_KEY, event.orderId().getBytes()) .build(); streamBridge.send("order-placed", message);

يجزّئ Kafka المفتاح لتحديد القسم، لذا تصل جميع الأحداث التي تشترك في orderId ذاته إلى القسم ذاته وتُعالج بترتيب وصولها.

معالجة الأخطاء ومواضيع الرسائل الميتة

إذا رمى مستهلك استثناءً يُعيد SCS المحاولة افتراضيًا (قابل للضبط عبر maxAttempts). بعد استنفاد المحاولات يمكن إرسال الرسالة إلى موضوع الرسائل الميتة (DLT) بدلًا من إسقاطها بصمت:

spring: cloud: stream: kafka: bindings: processOrder-in-0: consumer: enable-dlq: true # إرسال الرسائل الفاشلة إلى order-placed.DLT dlq-name: order-placed.DLT max-attempts: 3 back-off-initial-interval: 1000 # بالمللي ثانية back-off-max-interval: 10000 back-off-multiplier: 2.0
لا تبتلع استثناءات المستهلك بصمت أبدًا. كتلة catch تسجّل وتُعيد بنجاح ستُقدّم الإزاحة وتُفقد الحدث بشكل دائم. بدلًا من ذلك دع الاستثناء يتشابك لكي يُطبّق SCS منطق إعادة المحاولة والـ DLT. راقب مواضيع DLT الخاصة بك — الأحداث غير المعالجة فيها مؤشر على وجود خطأ أو تعارض في عقد البيانات.

اعتبارات الأمان

يجب أن تُطبّق مجموعات Kafka في الإنتاج المصادقة والتشفير. الأسلوب الأكثر شيوعًا هو SASL/SCRAM عبر TLS:

spring: kafka: security: protocol: SASL_SSL properties: sasl.mechanism: SCRAM-SHA-512 sasl.jaas.config: > org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}"; ssl: trust-store-location: classpath:kafka-truststore.jks trust-store-password: ${KAFKA_TRUSTSTORE_PASS}

حمّل بيانات الاعتماد دائمًا من متغيرات البيئة أو مدير أسرار (Vault أو AWS Secrets Manager). كلمة مرور بنص عادي في application.yml مُلتزَم بها في مستودع الكود خطيرة بالقدر ذاته من كلمة مرور قاعدة بيانات مكشوفة.

بيئة التطوير المحلية مع Docker Compose

أطلق مجموعة Kafka بسيطة للتطوير المحلي بملف docker-compose.yml واحد:

services: zookeeper: image: confluentinc/cp-zookeeper:7.6.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 kafka: image: confluentinc/cp-kafka:7.6.0 depends_on: [zookeeper] ports: - "9092:9092" environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

بدلًا من ذلك استخدم وضع KRaft الأحدث (Kafka دون Zookeeper) المتاح في صور Confluent cp-kafka 7.4+ بحاوية واحدة.

الخلاصة

يجعل نموذج سجل Kafka الدائم القابل لإعادة التشغيل أساسًا لمعماريات الخدمات المصغرة المدفوعة بالأحداث. يتيح لك Spring Cloud Stream إنتاج رسائل Kafka واستهلاكها عبر حبوب Spring المألوفة — Supplier وConsumer وStreamBridge — مع ربط الطوبولوجيا بالكامل عبر الإعداد. الأنماط الواجب إتقانها فورًا هي: اضبط دائمًا مجموعة مستهلكين group، واستخدم مفاتيح الرسائل لضمانات الترتيب، وضبط موضوع رسائل ميتة حتى لا يُفقد أي حدث بصمت. يتناول الدرس القادم التتبع الموزع الذي يصبح ضروريًا بمجرد تدفق الأحداث بشكل غير متزامن عبر الخدمات.