Pulsar
Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud.
helm repo add apache-pulsar https://pulsar.apache.org/charts
helm repo update
Install Pulsar.
- Plane
- Helm
plane up datalayer-pulsar
cat << 'EOF' > /tmp/values.yaml
volumes:
persistence: true
components:
functions: false
toolset: false
proxy: false
pulsar_manager: false
kube-prometheus-stack:
enabled: false
bookkeeper:
replicaCount: 2
broker:
replicaCount: 2
proxy:
podMonitor:
enabled: false
zookeeper:
replicaCount: 2
EOF
export RELEASE=datalayer-pulsar
export NAMESPACE=datalayer-pulsar
helm upgrade \
--install $RELEASE \
apache/pulsar \
--create-namespace \
--namespace $NAMESPACE \
--values /tmp/values.yaml \
--timeout 10m
Check the availability of the Pulsar Pods.
kubectl get pods -n datalayer-pulsar
# NAME READY STATUS RESTARTS AGE
# datalayer-pulsar-bookie-0 1/1 Running 0 10m
# datalayer-pulsar-bookie-1 1/1 Running 0 10m
# datalayer-pulsar-bookie-init-brzdj 0/1 Completed 0 10m
# datalayer-pulsar-broker-0 1/1 Running 0 10m
# datalayer-pulsar-broker-1 1/1 Running 0 10m
# datalayer-pulsar-pulsar-init-5plc6 0/1 Completed 0 10m
# datalayer-pulsar-recovery-0 1/1 Running 0 10m
# datalayer-pulsar-zookeeper-0 1/1 Running 0 10m
# datalayer-pulsar-zookeeper-1 1/1 Running 0 10m
To interact with Pulsar, you first need to port-forward.
plane pf-pulsar
To connect from you host to the Pulsar service, add the following entries in your /etc/hosts
file.
# /etc/hosts
127.0.0.1 datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-0.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-1.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-2.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local
You can check the connection to Pulsar with the following Python code (you need pulsar-client
).
# Producer.
import asyncio
import logging
import json
import pulsar
logger = logging.getLogger(__name__)
client = pulsar.Client("pulsar://datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local:6650", logger=logger)
producer: pulsar.Producer = client.create_producer("test_topic", properties={"producer-name": "test_producer"})
producer.is_connected()
message_id = producer.send(
json.dumps(
{
"key_1": "value_1",
}
).encode(errors="replace")
)
print(message_id)
# Client.
import asyncio
import logging
import json
import pulsar
from pulsar import InitialPosition
logger = logging.getLogger(__name__)
client = pulsar.Client("pulsar://datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local:6650", logger=logger)
consumer: pulsar.Consumer = client.subscribe("test_topic", "test_subscription", consumer_name="test_consumer", initial_position=InitialPosition.Earliest)
message = consumer.receive()
print(message)
data = json.loads(message.data())
print(data)
Tear down Pulsar if needed.
- Plane
- Helm
plane down datalayer-pulsar
export RELEASE=datalayer-pulsar
export NAMESPACE=datalayer-pulsar
helm delete $RELEASE --namespace $NAMESPACE
Spin Pulsar Manager
Pulsar Manager provides a web-based GUI to manage and monitor Pulsar. It is not deployed by default but you can set up an instance using the following commands.
The following deployment should not be used in production as it is unsecured.
After loading the kubernetes configuration, execute the following snippet to start and initialize the manager:
cat << 'EOF' > /tmp/pulsar-manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-manager
namespace: datalayer-pulsar
spec:
replicas: 1
selector:
matchLabels:
app: pulsar-manager
template:
metadata:
labels:
app: pulsar-manager
spec:
containers:
# The pulsar manager container
- name: pulsar-manager
image: apachepulsar/pulsar-manager:v0.3.0
env:
- name: SPRING_CONFIGURATION_FILE
value: /pulsar-manager/pulsar-manager/application.properties
# Container to initialize an admin account
- name: set-pwd
image: curlimages/curl:8.10.1
command: ["sh", "-c"]
args:
- |
sleep 10
export CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
EOF
kubectl apply -f /tmp/pulsar-manager.yaml
export POD_NAME=$(kubectl get pod -n datalayer-pulsar -l app=pulsar-manager -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward -n datalayer-pulsar pod/$POD_NAME 9527:9527
Then open http://localhost:9527, enter the login admin with password apachepulsar.
Finally you will need to define a new environment with the following parameters:
- Environment name:
<what you want>
. - Service URL: http://datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local:8080
- Bookie URL: http://datalayer-pulsar-bookie.datalayer-pulsar.svc.cluster.local:8000