Skip to main content

Pulsar

Apache Pulsar is an open-source, distributed messaging and streaming platform built for the cloud.

helm repo add apache-pulsar https://pulsar.apache.org/charts
helm repo update

Install Pulsar.

plane up datalayer-pulsar

Check the availability of the Pulsar Pods.

kubectl get pods -n datalayer-pulsar

# NAME READY STATUS RESTARTS AGE
# datalayer-pulsar-bookie-0 1/1 Running 0 10m
# datalayer-pulsar-bookie-1 1/1 Running 0 10m
# datalayer-pulsar-bookie-init-brzdj 0/1 Completed 0 10m
# datalayer-pulsar-broker-0 1/1 Running 0 10m
# datalayer-pulsar-broker-1 1/1 Running 0 10m
# datalayer-pulsar-pulsar-init-5plc6 0/1 Completed 0 10m
# datalayer-pulsar-recovery-0 1/1 Running 0 10m
# datalayer-pulsar-zookeeper-0 1/1 Running 0 10m
# datalayer-pulsar-zookeeper-1 1/1 Running 0 10m

To interact with Pulsar, you first need to port-forward.

plane pf-pulsar

To connect from you host to the Pulsar service, add the following entries in your /etc/hosts file.

# /etc/hosts
127.0.0.1 datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-0.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-1.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local datalayer-pulsar-broker-2.datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local

You can check the connection to Pulsar with the following Python code (you need pulsar-client).

# Producer.
import asyncio
import logging
import json
import pulsar

logger = logging.getLogger(__name__)

client = pulsar.Client("pulsar://datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local:6650", logger=logger)

producer: pulsar.Producer = client.create_producer("test_topic", properties={"producer-name": "test_producer"})
producer.is_connected()

message_id = producer.send(
json.dumps(
{
"key_1": "value_1",
}
).encode(errors="replace")
)
print(message_id)
# Client.
import asyncio
import logging
import json
import pulsar
from pulsar import InitialPosition

logger = logging.getLogger(__name__)

client = pulsar.Client("pulsar://datalayer-pulsar-broker.datalayer-pulsar.svc.cluster.local:6650", logger=logger)

consumer: pulsar.Consumer = client.subscribe("test_topic", "test_subscription", consumer_name="test_consumer", initial_position=InitialPosition.Earliest)
message = consumer.receive()
print(message)
data = json.loads(message.data())
print(data)

Tear down Pulsar if needed.

plane down datalayer-pulsar

Spin Pulsar Manager

Pulsar Manager provides a web-based GUI to manage and monitor Pulsar. It is not deployed by default but you can set up an instance using the following commands.

caution

The following deployment should not be used in production as it is unsecured.

After loading the kubernetes configuration, execute the following snippet to start and initialize the manager:

cat << 'EOF' > /tmp/pulsar-manager.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pulsar-manager
namespace: datalayer-pulsar
spec:
replicas: 1
selector:
matchLabels:
app: pulsar-manager
template:
metadata:
labels:
app: pulsar-manager
spec:
containers:
# The pulsar manager container
- name: pulsar-manager
image: apachepulsar/pulsar-manager:v0.3.0
env:
- name: SPRING_CONFIGURATION_FILE
value: /pulsar-manager/pulsar-manager/application.properties
# Container to initialize an admin account
- name: set-pwd
image: curlimages/curl:8.10.1
command: ["sh", "-c"]
args:
- |
sleep 10
export CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H 'X-XSRF-TOKEN: $CSRF_TOKEN' \
-H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' \
-H "Content-Type: application/json" \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
EOF
kubectl apply -f /tmp/pulsar-manager.yaml
export POD_NAME=$(kubectl get pod -n datalayer-pulsar -l app=pulsar-manager -o jsonpath="{.items[0].metadata.name}")
kubectl port-forward -n datalayer-pulsar pod/$POD_NAME 9527:9527

Then open http://localhost:9527, enter the login admin with password apachepulsar.

Finally you will need to define a new environment with the following parameters: