Building AI applications that truly deliver has been my obsession lately, and I’ve finally cracked something worth sharing. By creating a Kafka-MCP server and connecting it with our existing Qdrant-MCP server, we’ve transformed how our team handles communication and data retrieval. The real magic happened when we linked this setup to Claude for Desktop — suddenly our message passing became seamless and vector searches lightning-fast. Our AI and Agentic Applications are now not just more efficient but genuinely intuitive, handling complex data processing tasks using standard protocols. So in this article I will show you how to build an MCP server for Kafka and Qdrant.

The Architecture:
Clients (Claude Desktop)
The system begins with client applications, likely representing the frontend interfaces for users. These clients communicate with two distinct server components via the Model Context Protocol (MCP), which facilitates structured data exchange between the clients and servers.
Kafka Server
One branch of communication connects clients to a Kafka MCP Server. This server interfaces with Apache Kafka, a distributed event streaming platform capable of handling high-throughput, fault-tolerant real-time data feeds. The publish/consume relationship shown in the diagram indicates that:
- Clients can publish data or events to Kafka topics
- Clients can consume data streams from Kafka topics
- The system leverages Kafka’s capabilities for event sourcing and real-time analytics
Qdrant Server
The second communication path links clients to a Qdrant MCP Server. Qdrant is a vector similarity search engine designed for production-ready, high-load applications. The store/find connection depicted in the diagram suggests that:
- Clients can store vector embeddings or other structured data in Qdrant
- Clients can perform similarity searches or retrieval operations
- The system likely utilizes Qdrant for semantic search, recommendation systems, or other vector-based operations
A notable detail is the kafka-qdrant connector shown in the diagram, indicating that data flows between the Kafka ecosystem and Qdrant. This connector likely ensures data consistency across both systems, allowing events from Kafka to update the vector store or trigger operations in Qdrant.
What is MCP?
The Model Context Protocol (MCP) is an open standard developed by Anthropic to streamline interactions between AI models and external tools, data sources, and services. By providing a standardized framework, MCP allows large language models (LLMs) to access and process real-time information beyond their static training data, enhancing their functionality and adaptability. (reference)
Key Advantages of MCP:
Enhanced AI Capabilities: By enabling AI models to interact directly with external systems, MCP allows them to perform tasks such as retrieving up-to-date information, executing actions within applications, and utilizing specialized tools, thereby extending their core functionalities. (reference)
Scalability: MCP’s standardized protocol facilitates easier scaling of AI services, allowing developers to integrate more tools and data sources without significantly increasing complexity. (reference)
The Implementation:
Install Confluent kafka:
Confluent Platform offers several installation options tailored to different environments and preferences:(reference)
ZIP and TAR Archives:
- Suitable for both development and production setups, you can download the platform as ZIP or TAR files. After downloading, extract the contents and configure the environment variables as needed. (reference)
Package Managers:
- Debian and Ubuntu: Utilize APT repositories to install Confluent Platform using systemd. This method is ideal for multi-node environments. (reference)
- RHEL, CentOS, and Fedora: Use YUM repositories for installation, ensuring integration with systemd for service management.
- Docker: For containerized environments, Confluent provides Docker images. This approach is beneficial for both development and testing scenarios. (reference)
- Confluent CLI: The Confluent Command Line Interface can be installed separately, offering streamlined management of Confluent Platform components. It’s available for macOS, Linux, and Windows. (reference)
Orchestrated Installations:
- Confluent for Kubernetes: Deploy Confluent Platform on Kubernetes clusters, leveraging Kubernetes’ orchestration capabilities.(reference)
- Ansible Playbooks: Automate the deployment process using Ansible, suitable for consistent setups across multiple environments.
Once the confluent platform is installed make it up and running as below(Mac version). once the platform is up and running navigate to http://localhost:9021
╭─ ~ ·············································
base at 11:55:15 AM
╰─ confluent local services start
# The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html
# As of Confluent Platform 8.0, Java 8 will no longer be supported.
# Using CONFLUENT_CURRENT: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152
# ZooKeeper is [UP]
# Kafka is [UP]
# Schema Registry is [UP]
# Kafka REST is [UP]
# Connect is [UP]
# Starting ksqlDB Server
# ksqlDB Server is [UP]
# Starting Control Center
# Control Center is [UP]

Install the Kafka-Qdrant connector using confluent cli as below.
╭─ ~ ······················································
took 37s base at 11:55:56 AM
╰─ confluent-hub install qdrant/qdrant-kafka:1.1.0
The component can be installed in any of the following Confluent Platform installations:
1. /Users/pavanmantha/Pavans/confluent-7.9.0 (based on $CONFLUENT_HOME)
2. /Users/pavanmantha/Pavans/confluent-7.9.0 (where this tool is installed)
Choose one of these to continue the installation (1-2): 1
Do you want to install this into /Users/pavanmantha/Pavans/confluent-7.9.0/share/confluent-hub-components? (yN) y
Component's license:
The Apache License, Version 2.0
https://www.apache.org/licenses/LICENSE-2.0
I agree to the software license agreement (yN) y
You are about to install 'qdrant-kafka' from Qdrant, as published on Confluent Hub.
Do you want to continue? (yN) y
Downloading component Qdrant Connector for Apache Kafka 1.1.2, provided by Qdrant from Confluent Hub and installing into /Users/pavanmantha/Pavans/confluent-7.9.0/share/confluent-hub-components
Detected Worker's configs:
1. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-distributed.properties
2. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-standalone.properties
3. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-distributed.properties
4. Standard: /Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-standalone.properties
5. Based on CONFLUENT_CURRENT: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties
6. Used by Connect process with PID 29014: /var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties
Do you want to update all detected configs? (yN) y
Adding installation directory to plugin path in the following files:
/Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-distributed.properties
/Users/pavanmantha/Pavans/confluent-7.9.0/etc/kafka/connect-standalone.properties
/Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-distributed.properties
/Users/pavanmantha/Pavans/confluent-7.9.0/etc/schema-registry/connect-avro-standalone.properties
/var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties
/var/folders/1x/36nz44_569501n4xs0px2_8m0000gn/T/confluent.288152/connect/connect.properties
Completed

edit the configuration of the QdrantSinkConnector
as shown or you can upload a pre configuration from your previous projects by uploading it.
{
"name": "QdrantSinkConnectorConnector_0",
"config": {
"name": "QdrantSinkConnectorConnector_0",
"connector.class": "io.qdrant.kafka.QdrantSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "mcp_topic",
"errors.deadletterqueue.topic.name": "dead_queue",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true",
"qdrant.grpc.url": "https://0029f2c8-edd5-412d-8924-a7c8f0481362.europe-west3-0.gcp.cloud.qdrant.io",
"qdrant.api.key": "****************************************************************************************************",
"value.converter.schemas.cache.size": "1",
"value.converter.schemas.enable": "false"
}
}

Once done and saved the QdrantSinkConnector
should be in running state and the CDC is now ready.
The Kafka MCP Server:
The project structure of the kafka MCP server looks as below.
.
├── LICENSE
├── README.md
├── kafka_mcp_server.iml
├── requirements.txt
└── src
├── __pycache__
└── kafka_mcp_server
├── __init__.py
├── kafka.py
├── main.py
├── server.py
└── settings.py
settings.py
from pydantic_settings import BaseSettings
from pydantic import Field
from typing import Optional
DEFAULT_TOOL_PUBLISH_DESCRIPTION = (
"publish the information to the kafka topic for the down stream usage."
)
DEFAULT_TOOL_CONSUME_DESCRIPTION = (
"Look up topics in kafka. Use this tool when you need to: \n"
" - consume information from the topics\n"
)
class ToolSettings(BaseSettings):
"""
Configuration for all the tools.
"""
tool_publish_description: str = Field(
default=DEFAULT_TOOL_PUBLISH_DESCRIPTION,
validation_alias="TOOL_PUBLISH_DESCRIPTION",
)
tool_consume_description: str = Field(
default=DEFAULT_TOOL_CONSUME_DESCRIPTION,
validation_alias="TOOL_CONSUME_DESCRIPTION",
)
class KafkaSettings(BaseSettings):
"""
Configuration for the Kafka connector.
"""
bootstrap_server: Optional[str] = Field(default=None, validation_alias="KAFKA_BOOTSTRAP_SERVERS")
topic_name: Optional[str] = Field(default=None, validation_alias="TOPIC_NAME")
from_beginning: Optional[bool] = Field(default=False, validation_alias="IS_TOPIC_READ_FROM_BEGINNING")
group_id: Optional[str] = Field(default="kafka-mcp-group", validation_alias="DEFAULT_GROUP_ID_FOR_CONSUMER")
def get_kafka_bootstrap_server(self) -> str:
"""
Get the Kafka location from bootstrap URL.
"""
return self.bootstrap_server
The above code is the initial load file which will read the required configs from a .env file like kafka server names and topic name etc. Alongside it will also provide appropriate tool descriptions as a global configuration.
kafka.py
import logging
import json
import uuid
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class KafkaConnector:
"""
Encapsulates the connection to a kafka server and all the methods to interact with it.
:param kafka_bootstrap_url: The URL of the kafka server.
:param topic_name: The topic to which the client will talk to.
"""
def __init__(self, kafka_bootstrap_url: str, topic_name: str, group_id: str):
self.KAFKA_BOOTSTRAP_SERVERS = kafka_bootstrap_url
self.topic_name = topic_name
self.group_id = group_id
self.producer = None
async def create_producer(self):
"""Create and start a Kafka producer."""
producer = AIOKafkaProducer(bootstrap_servers=self.KAFKA_BOOTSTRAP_SERVERS)
await producer.start()
logger.info(f"Kafka producer started, connected to {self.KAFKA_BOOTSTRAP_SERVERS}")
self.producer = producer
return producer
async def close_producer(self):
"""Close the Kafka producer."""
await self.producer.stop()
logger.info("Kafka producer stopped")
async def publish(self, value):
"""
Publish a message to the specified Kafka topic.
Args:
producer: AIOKafkaProducer instance
topic_name: Topic to publish to
key: Message key (can be None)
value: Message value
"""
try:
key = str(uuid.uuid4())
# Convert value to bytes if it's not already
if isinstance(value, dict):
value_bytes = json.dumps(value).encode('utf-8')
elif isinstance(value, str):
value_bytes = value.encode('utf-8')
else:
value_bytes = value
# Convert key to bytes if it's not None and not already bytes
key_bytes = None
if key is not None:
if isinstance(key, str):
key_bytes = key.encode('utf-8')
else:
key_bytes = key
# Send message
await self.producer.send_and_wait(self.topic_name, value=value_bytes, key=key_bytes)
logger.info(f"Published message with key {key} to topic {self.topic_name}")
except Exception as e:
logger.error(f"Error publishing message: {e}")
raise
async def consume(self, from_beginning=True):
"""
Consume messages from the specified Kafka topics.
Args:
from_beginning: Whether to start consuming from the beginning
"""
# Convert single topic to list
if isinstance(self.topic_name, str):
topics = [self.topic_name]
# Set auto_offset_reset based on from_beginning
auto_offset_reset = 'earliest' if from_beginning else 'latest'
# Create consumer
consumer = AIOKafkaConsumer(
*topics,
bootstrap_servers=self.KAFKA_BOOTSTRAP_SERVERS,
group_id=self.group_id,
auto_offset_reset=auto_offset_reset,
enable_auto_commit=True,
)
# Start consumer
await consumer.start()
logger.info(f"Kafka consumer started, subscribed to {topics}")
messages = []
try:
# Get a batch of messages with timeout
batch = await consumer.getmany(timeout_ms=5000)
for tp, msgs in batch.items():
for msg in msgs:
logger.info(f"Raw message received: {msg}")
processed_message = await self._process_message(msg)
messages.append(processed_message)
return messages
finally:
# Close consumer
await consumer.stop()
logger.info("Kafka consumer stopped")
async def _process_message(self, msg):
"""
Process a message received from Kafka.
Args:
msg: Message object from Kafka
"""
try:
# Decode the message value
if msg.value:
try:
value = json.loads(msg.value.decode('utf-8'))
except json.JSONDecodeError:
value = msg.value.decode('utf-8')
else:
value = None
# Decode the message key
key = msg.key.decode('utf-8') if msg.key else None
logger.info(f"Received message: Topic={msg.topic}, Partition={msg.partition}, "
f"Offset={msg.offset}, Key={key}, Value={value}")
# Your message processing logic here
return value
except Exception as e:
logger.error(f"Error processing message: {e}")
raise
The kafka.py
file will provide all the operations that are required for us to contact the kafka server and publish the data to a topic.
server.py
import logging
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
from mcp.server import Server
from mcp.server.fastmcp import Context, FastMCP
from kafka import KafkaConnector
from settings import KafkaSettings, ToolSettings
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
"""
Context manager to handle the lifespan of the server.
This is used to configure the kafka connector.
All the configuration is now loaded from the environment variables.
Settings handle that for us.
"""
try:
kafka_configurations = KafkaSettings()
logger.info(
f"Connecting to kafka at {kafka_configurations.get_kafka_bootstrap_server()}"
)
kafka_connector = KafkaConnector(kafka_bootstrap_url=kafka_configurations.bootstrap_server,
topic_name=kafka_configurations.topic_name,
group_id=kafka_configurations.group_id)
await kafka_connector.create_producer()
yield {
"kafka_connector": kafka_connector,
}
except Exception as e:
logger.error(e)
raise e
finally:
pass
# FastMCP is an alternative interface for declaring the capabilities
# of the server. Its API is based on FastAPI.
mcp = FastMCP("mcp-server-kafka", lifespan=server_lifespan)
# Load the tool settings from the env variables, if they are set,
# or use the default values otherwise.
tool_settings = ToolSettings()
@mcp.tool(name="kafka-publish", description=tool_settings.tool_publish_description)
async def publish(ctx: Context, information: Any) -> str:
"""
:param ctx:
:param information:
:return:
"""
await ctx.debug(f"Storing information {information} in kafka topic")
kafka_connector: KafkaConnector = ctx.request_context.lifespan_context[
"kafka_connector"
]
await kafka_connector.publish(value=information)
return f"published: {information}"
@mcp.tool(name="kafka-consume", description=tool_settings.tool_consume_description)
async def consumer(ctx: Context) -> str:
"""
:param ctx:
:param information:
:return:
"""
await ctx.debug(f"consuming information from kafka")
kafka_connector: KafkaConnector = ctx.request_context.lifespan_context[
"kafka_connector"
]
information = await kafka_connector.consume()
return f"consumed: {information}"
the above server.py
code is the actual file that creates the MCP server and configures 2 tools namely kafka-publish
and kafka-consume
which are visible to clients to use them.
main.py
: This is just a driver code.
import argparse
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
def main():
"""
Main entry point for the mcp-server-kafka script.
It runs the MCP server with a specific transport protocol.
"""
# Parse the command-line arguments to determine the transport protocol.
parser = argparse.ArgumentParser(description="mcp-server-kafka")
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
args = parser.parse_args()
# Import is done here to make sure environment variables are loaded
# only after we make the changes.
from server import mcp
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()
after this let us make our claude for desktop know about the MCP server so let us edit the claude_desktop_config.json
{
"mcpServers": {
"kafka": {
"command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
"args": [
"/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
]
},
}
}
The Fastembed MCP Server:
The project structure of the fastembed MCP server looks as below.
.
├── LICENSE
├── README.md
├── fastembed_mcp_server.iml
├── requirements.txt
└── src
└── fastembed_mcp_server
├── __init__.py
├── fastembed_connector.py
├── main.py
├── server.py
└── settings.py
settings.py
The first file to read all the require configurations mainly the embed_model
name and global tool descriptions as ToolSettings
.
from typing import Optional
from pydantic import Field
from pydantic_settings import BaseSettings
DEFAULT_TOOL_LIST_EMBEDDINGS = (
"List the embeddings that are available with fastembed."
)
DEFAULT_TOOL_EMBED_DOCUMENTS = (
"Look up given documents to vector embed them. Use this tool when you need to: \n"
" - vectorize the given documents\n"
)
class ToolSettings(BaseSettings):
"""
Configuration for all the tools.
"""
tool_list_embeddings_description: str = Field(
default=DEFAULT_TOOL_LIST_EMBEDDINGS,
validation_alias="TOOL_LIST_EMBEDDINGS_DESCRIPTION",
)
tool_embed_documents_description: str = Field(
default=DEFAULT_TOOL_EMBED_DOCUMENTS,
validation_alias="TOOL_EMBED_DOCUMENTS_DESCRIPTION",
)
class FastembedSettings(BaseSettings):
"""
Configuration for the fastembed connector.
"""
embed_model: Optional[str] = Field(default=None, validation_alias="EMBED_MODEL")
def get_fastembed_model(self) -> str:
"""
Get the embedding model name.
"""
return self.embed_model
fastembed_connector.py
This is the main file that will expose the actual operations of fastembed such as list available models
or embed documents
to the server.
import logging
from fastembed import TextEmbedding
from typing import List, Dict, Any
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class FastembedConnector:
def __init__(self, embed_model: str):
self.embedding = TextEmbedding(model_name=embed_model)
async def supported_embeddings(self) -> List[Dict[str, Any]]:
"""
:return:
"""
try:
return TextEmbedding.list_supported_models()
except Exception as e:
logger.error(f"Error while fetching supported models: {e}")
raise
async def embed_documents(self, value):
"""
:param value:
:return:
"""
embedding = None
embeddings_generator = None
try:
if isinstance(value, str):
embeddings_generator = self.embedding.embed([value])
logger.info(f"document {value} embedded to {embeddings_generator}")
if isinstance(value, list):
embeddings_generator = self.embedding.embed(value)
logger.info(f"documents {value} embedded to {embeddings_generator}")
embedding = list(embeddings_generator)
logger.info(f"documents {value} embedded to {embedding}")
return embedding
except Exception as e:
logger.error(f"Error embedding documents: {e}")
raise
server.py
the actual file to spin up the MCP server and the tools that are expose to MCP clients (claude desktop in our case)
import logging
from mcp.server import Server
from mcp.server.fastmcp import FastMCP, Context
from contextlib import asynccontextmanager
from typing import Any, AsyncIterator
from fastembed_connector import FastembedConnector
from settings import FastembedSettings, ToolSettings
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def server_lifespan(server: Server) -> AsyncIterator[dict]:
"""
:param server:
:return:
"""
try:
fastembed_configuration = FastembedSettings()
logger.info(f"Connecting to fastembed")
fastembed_connector = FastembedConnector(embed_model=fastembed_configuration.embed_model)
yield {
"fastembed_connector": fastembed_connector
}
except Exception as e:
logger.error(e)
raise e
# FastMCP is an alternative interface for declaring the capabilities
# of the server. Its API is based on FastAPI.
mcp = FastMCP("mcp-server-fastembed", lifespan=server_lifespan)
# Load the tool settings from the env variables, if they are set,
# or use the default values otherwise.
tool_settings = ToolSettings()
@mcp.tool(name="supported-embedding-models", description=tool_settings.tool_list_embeddings_description)
async def list_supported_embedding_models(ctx: Context) -> Any:
await ctx.debug(f"fetching the list of supported models from fastembed")
fastembed_connector: FastembedConnector = ctx.request_context.lifespan_context["fastembed_connector"]
embeddings = await fastembed_connector.supported_embeddings()
return embeddings
@mcp.tool(name="embed-documents", description=tool_settings.tool_embed_documents_description)
async def list_supported_embedding_models(ctx: Context, documents: Any) -> Any:
await ctx.debug(f"embedding the provided documents {documents}")
fastembed_connector: FastembedConnector = ctx.request_context.lifespan_context["fastembed_connector"]
embeddings = await fastembed_connector.embed_documents(value=documents)
return embeddings
main.py
The driver code.
import argparse
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
def main():
"""
Main entry point for the mcp-server-fastembed script.
It runs the MCP server with a specific transport protocol.
"""
# Parse the command-line arguments to determine the transport protocol.
parser = argparse.ArgumentParser(description="mcp-server-fastembed")
parser.add_argument(
"--transport",
choices=["stdio", "sse"],
default="stdio",
)
args = parser.parse_args()
# Import is done here to make sure environment variables are loaded
# only after we make the changes.
from server import mcp
mcp.run(transport=args.transport)
if __name__ == "__main__":
main()
after this let us make our claude for desktop know about the MCP server so let us edit the claude_desktop_config.json
{
"mcpServers": {
"kafka": {
"command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
"args": [
"/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
]
},
"fastembed": {
"command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
"args": [
"/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/fastembed_mcp_server/src/fastembed_mcp_server/main.py"
]
}
}
}
Installing Qdrant MCP Server:
you can follow the elow steps to make the qdrant mcp server
up and running.
git clone https://github.com/qdrant/mcp-server-qdrant<br><br>QDRANT_URL="http://localhost:6333" \<br>COLLECTION_NAME="test" \<br>EMBEDDING_MODEL="BAAI/bge-small-en-v1.5" \<br>uvx mcp-server-qdrant
{
"mcpServers": {
"kafka": {
"command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
"args": [
"/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/kafka_mcp_server/src/kafka_mcp_server/main.py"
]
},
"fastembed": {
"command": "/Users/pavanmantha/Pavans/anaconda/anaconda3/bin/python",
"args": [
"/Users/pavanmantha/Pavans/PracticeExamples/DataScience_Practice/LLMs/anthropic_tutorials/fastembed_mcp_server/src/fastembed_mcp_server/main.py"
]
},
"qdrant": {
"command": "/Users/pavanmantha/.local/bin/uvx",
"args": [
"mcp-server-qdrant"
],
"env": {
"QDRANT_URL": "http://localhost:6333",
"QDRANT_API_KEY": "qdrant_db_api_key",
"COLLECTION_NAME": "test",
"EMBEDDING_MODEL": "BAAI/bge-small-en-v1.5"
}
}
}
}
for more details follow this link on qdrant mcp server.
The Results:
With all the above MCP servers and configurations, we made claude for desktop as powerful agent that can do some complicated tasks for us. all the conversation is eventually pushed in a specified format to kafka and then CDC gets triggered and eventually the data is available in qdrant, now you can directly search qdrant in claude to see what all you have done. this is all instantaneous in nature.





The Conclusion:
This architecture, built on MCP (Model Context Protocol), empowers Claude Desktop to seamlessly interact with Kafka, Qdrant, and FastEmbed, unlocking a highly efficient, scalable, and intelligent AI-driven pipeline. With Kafka handling real-time message streaming, Qdrant enabling vector-based search and retrieval, and FastEmbed accelerating embedding generation, this system is capable of real-time contextual understanding, fast semantic search, and scalable AI deployments. The integration via Kafka-Qdrant connectors ensures a smooth flow of information, making this architecture ideal for applications like personalized assistants, intelligent search engines, recommendation systems, and autonomous decision-making workflows. It is a future-ready AI Agent design that enhances both performance and adaptability in AI-driven ecosystems.