使用 Docker 部署 Kafka 與 Node.js 實現訊息生產與消費

更新 發佈閱讀 12 分鐘
Kafka 是一個高效能、可擴展的分布式事件流平台,常用於處理大量的實時數據流。在這篇文章中將介紹如何使用 Docker 部署 Kafka 和 Zookeeper,並通過 Node.js 實現 Kafka 生產者與消費者的簡單範例,協助你快速理解 Kafka 的基本用法。

Kafka 和 Zookeeper 部署

Kafka 需要 Zookeeper 來協調和管理分布式環境中的服務狀態。可以使用以下的 docker-compose.yml 文件來搭建 Kafka 和 Zookeeper:

version: '3'

services:

zookeeper:

image: wurstmeister/zookeeper:latest

container_name: zookeeper

ports:

- "2181:2181"

kafka:

image: wurstmeister/kafka:latest

container_name: kafka

ports:

- "9093:9093"

environment:

KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://192.168.0.193:9093

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT

KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093

KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

depends_on:

- zookeeper

服務啟動後確認 Kafka 是否正常運行

啟動 Docker 容器後,可以使用以下命令來檢查 Kafka 是否成功啟動:

docker exec kafka kafka-topics.sh --list --bootstrap-server 192.168.0.193:9093

如果成功列出現有的 topic,則表示 Kafka 正常運行

使用 Kafka UI 監控消息發送情況

若希望視覺化監控 Kafka 的消息流動,可以使用 Kafka UI 工具來進行監控。使用以下命令來啟動 Kafka UI:

podman run -it -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true provectuslabs/kafka-ui

這將啟動一個可以透過瀏覽器訪問的 UI,幫助你查看和管理 Kafka 中的 topic 和消息。

Node.js 與 Kafka 整合

安裝所需套件

在 Node.js 中,我們可以使用 KafkaJS 來操作 Kafka。首先需要安裝 kafkajs

npm install kafkajs

目錄結構

以下是範例專案的目錄結構:

project/
├── producer.js // Kafka 生產者
├── consumer.js // Kafka 消費者
├── server.js // Node.js 伺服器

Kafka 生產者 (producer.js)

Kafka 生產者負責將消息發送到 Kafka 的指定 topic 中。以下是簡單的 Kafka 生產者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const producer = kafka.producer();

const sendMessage = async (topic, messages) => {
await producer.connect();
await producer.send({
topic,
messages: Array.isArray(messages) ? messages.map((value) => ({ value })) : [{ value: messages }],
});
console.log(`Messages sent to topic: ${topic}`);
await producer.disconnect();
};

// 如果是直接執行,則進行測試發送
if (require.main === module) {
const topic = 'test-topic';
const messages = ['Hello Kafka!', 'This is a standalone message.'];

sendMessage(topic, messages)
.then(() => console.log('Producer finished successfully.'))
.catch((err) => console.error('Producer encountered an error:', err));
}

module.exports = { sendMessage };

Kafka 消費者 (consumer.js)

Kafka 消費者從 Kafka 中訂閱指定的 topic,並處理收到的消息。以下是簡單的 Kafka 消費者程式碼:

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'], // 替換成你的 Kafka broker 地址
});

const consumer = kafka.consumer({ groupId: 'test-group' });

const startConsumer = async (topic, messageHandler) => {
await consumer.connect();
await consumer.subscribe({ topic, fromBeginning: true });

await consumer.run({
eachMessage: async ({ message }) => {
messageHandler(message.value.toString());
},
});
console.log(`Consumer started for topic: ${topic}`);
};

// 如果是直接執行,則啟動並列印收到的訊息
if (require.main === module) {
const topic = 'test-topic';

startConsumer(topic, (message) => {
console.log(`Standalone Consumer received message: ${message}`);
}).catch((err) => console.error('Consumer encountered an error:', err));
}

module.exports = { startConsumer };

Node.js 伺服器 (server.js)

接下來,我們將使用 Express 框架啟動一個簡單的 HTTP 伺服器,並透過 API 路由觸發 Kafka 生產者和消費者。

const express = require('express');
const { sendMessage } = require('./producer');
const { startConsumer } = require('./consumer');

const app = express();
const PORT = 3000;
const topic = 'test-topic';

// API 路由:發送訊息
app.get('/send', async (req, res) => {
const message = req.query.message || 'Default message';
try {
await sendMessage(topic, [message]);
res.send(`Message "${message}" sent to Kafka.`);
} catch (error) {
console.error('Error sending message:', error);
res.status(500).send('Failed to send message.');
}
});

// API 路由:啟動消費者
app.get('/consume', async (req, res) => {
try {
await startConsumer(topic, (message) => {
console.log(`Received message: ${message}`);
});
res.send('Consumer started. Check logs for received messages.');
} catch (error) {
console.error('Error starting consumer:', error);
res.status(500).send('Failed to start consumer.');
}
});

// 啟動伺服器
app.listen(PORT, () => {
console.log(`Server is running on http://localhost:${PORT}`);
});

啟動服務

  1. 啟動 Kafka 伺服器,確保 Kafka 正常運行。
  2. 啟動 Kafka 消費者(選擇性,僅用於查看消息):
    node consumer.js
  3. 啟動 Express 伺服器:
    node server.js

測試 API

你可以透過以下 API 發送和接收 Kafka 消息:

注意事項

  1. Kafka 設定:確保 Kafka Broker 運行於 localhost:9092(或根據實際情況更新為正確的地址)。
  2. Topic 設定:若 test-topic 尚未建立,KafkaJS 會自動創建它。
  3. 錯誤處理:生產者和消費者應加入完整的錯誤處理邏輯,以應對網絡問題或 Kafka 錯誤。



這篇文章簡單介紹了如何在 Docker 中部署 Kafka 和 Zookeeper,並且提供了在 Node.js 中使用 Kafka 進行訊息生產與消費的範例。希望能幫助你快速上手 Kafka,並了解如何與 Node.js 整合。



留言
avatar-img
嘿洽啦
1會員
7內容數
軟體開發 & 金融投資的日常筆記
嘿洽啦的其他內容
2024/11/22
RabbitMQ 和 Kafka 是兩種流行的消息處理工具,各自擅長不同的應用場景。RabbitMQ 以低延遲和靈活的消息路由著稱,適合即時通信和微服務;Kafka 則專注於高吞吐量和數據持久化,適用於大規模數據流和實時分析。本文比較了它們的性能、擴展性和安全性,幫助你選擇最符合需求的解決方案。
Thumbnail
2024/11/22
RabbitMQ 和 Kafka 是兩種流行的消息處理工具,各自擅長不同的應用場景。RabbitMQ 以低延遲和靈活的消息路由著稱,適合即時通信和微服務;Kafka 則專注於高吞吐量和數據持久化,適用於大規模數據流和實時分析。本文比較了它們的性能、擴展性和安全性,幫助你選擇最符合需求的解決方案。
Thumbnail
2024/11/22
Message Queue 和 Streaming Process 是分佈式系統中最重要的技術之一,但它們的應用場景和特性有明顯區別。消息隊列適合可靠的低延遲通信,而流處理專注於大規模數據流的實時分析。本文深入比較兩者特性,幫助你根據需求選擇合適的技術,打造更高效的系統架構!
Thumbnail
2024/11/22
Message Queue 和 Streaming Process 是分佈式系統中最重要的技術之一,但它們的應用場景和特性有明顯區別。消息隊列適合可靠的低延遲通信,而流處理專注於大規模數據流的實時分析。本文深入比較兩者特性,幫助你根據需求選擇合適的技術,打造更高效的系統架構!
Thumbnail
2024/11/22
本文介紹如何使用 Docker 安裝 RabbitMQ,並將其與 Node.js 結合,實現訊息的生產與消費。這個架構可以應用於分佈式系統或事件驅動架構中,幫助讀者理解如何整合 RabbitMQ 與 Node.js。
Thumbnail
2024/11/22
本文介紹如何使用 Docker 安裝 RabbitMQ,並將其與 Node.js 結合,實現訊息的生產與消費。這個架構可以應用於分佈式系統或事件驅動架構中,幫助讀者理解如何整合 RabbitMQ 與 Node.js。
Thumbnail
看更多
你可能也想看
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
我們在「【Message Queue - Kafka】不斷的試誤…, 用Docker來嘗試安裝Kafka」有介紹如何架設kafka, 其中我們使用環境變數來進行kafka的配置, 但除了環境變數之外, 其實還能夠用檔案配置的方式來對kafka進行配置, 如此一來我們就可以將配置檔與啟動檔完全分開,
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
這是一場修復文化與重建精神的儀式,觀眾不需要完全看懂《遊林驚夢:巧遇Hagay》,但你能感受心與土地團聚的渴望,也不急著在此處釐清或定義什麼,但你的在場感受,就是一條線索,關於如何找著自己的路徑、自己的聲音。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
Thumbnail
本文分析導演巴里・柯斯基(Barrie Kosky)如何運用極簡的舞臺配置,將布萊希特(Bertolt Brecht)的「疏離效果」轉化為視覺奇觀與黑色幽默,探討《三便士歌劇》在當代劇場中的新詮釋,並藉由舞臺、燈光、服裝、音樂等多方面,分析該作如何在保留批判核心的同時,觸及觀眾的觀看位置與人性幽微。
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
前言 上次我們針對 Docker 這樣容器化技術做了一點介紹,今天我們要來講解 Docker 架構,你是否發現在每次程式上伺服器的流程很麻煩呢 ? 是否發現你寫的程式在別的作業系統不能用呢 ? 如果你遇到這些問題,Docker 都可以幫助你解決這些問題 Docker 架構 在 Docker 這
Thumbnail
Kafka是一個先進的分佈式流處理平臺,具有高吞吐量、可擴展性、容錯性和低延遲特性,提供瞭解耦、非同步和削峰特點。本文介紹了Kafka的通訊模式、適合的應用場景和未來發展趨勢,旨在幫助使用者更好地理解和應用Kafka。
Thumbnail
Kafka是一個先進的分佈式流處理平臺,具有高吞吐量、可擴展性、容錯性和低延遲特性,提供瞭解耦、非同步和削峰特點。本文介紹了Kafka的通訊模式、適合的應用場景和未來發展趨勢,旨在幫助使用者更好地理解和應用Kafka。
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
我們在「【Message Queue - Kafka】串流時代的超入門簡介」有介紹到關於Kafka的基礎概念, 那麼本章節主要著重於生產者(Producer)的面向來細部探討, 看看生產者(Producer)究竟是什麼? 有哪些應該要注意的? 我們今天的主題除了說明生產者(Producer)的
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
連接器故名思議就是兩個系統之間的橋樑, 而Kafka Connect正是扮演著這樣的角色, 如圖上, 我們可以透過Kafka Connect將SQL的資料導出到Kafka並導入到MySQL。 豐富的Plugin Confluent Hub提供了各式各樣的外掛套件, 包括了MongoDB、My
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
背景:從冷門配角到市場主線,算力與電力被重新定價   小P從2008進入股市,每一個時期的投資亮點都不同,記得2009蘋果手機剛上市,當時蘋果只要在媒體上提到哪一間供應鏈,隔天股價就有驚人的表現,當時光學鏡頭非常熱門,因為手機第一次搭上鏡頭可以拍照,也造就傳統相機廠的殞落,如今手機已經全面普及,題
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
《轉轉生》(Re:INCARNATION)為奈及利亞編舞家庫德斯.奧尼奎庫與 Q 舞團創作的當代舞蹈作品,結合拉各斯街頭節奏、Afrobeat/Afrobeats、以及約魯巴宇宙觀的非線性時間,建構出關於輪迴的「誕生—死亡—重生」儀式結構。本文將從約魯巴哲學概念出發,解析其去殖民的身體政治。
Thumbnail
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少
Thumbnail
為什麼要用Docker安裝? Docker是一個容器化平台, 就類似於我們早期虛擬機的VMWare、Virtual Box…等, 虛擬機平台一般, 只是面向的是伺服端, 供企業快速、簡單、輕量的佈署開發完成的程式軟體, 並將相關的環境依賴皆封裝成一包所謂的映像檔(image), 透過這樣的方式減少
Thumbnail
今天簡單演示如何在Kubernetes cluster利用local-path provider建構出storage class來提供應用程式進行資料的存放。
Thumbnail
今天簡單演示如何在Kubernetes cluster利用local-path provider建構出storage class來提供應用程式進行資料的存放。
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇
Thumbnail
前言 大家好我們今天要來教 Docker 這項技術,什麼是 Docker ? Docker 可以幫助我們做什麼事情 ? Docker 是一項容器化技術,他可以降低我們在佈署 App 時,讓我們可以有效的分配作業系統資源,降低佈署作業成本,現在讓我們來了解 Docker 要解決的問題 傳統佈署遇
追蹤感興趣的內容從 Google News 追蹤更多 vocus 的最新精選內容追蹤 Google News