1.23 Publishing messages with MQTT, using Clojure and the Eclipse Paho client
=============================================================================

2013-06-02

  I have found that MQTT (https://mqtt.org) (MQ Telemetry Protocol) is
quite simple to use, and especially so when compared to most other
messaging protocols.  In particular it's a great fit for embedded
devices or whenever resources are limited, since it's quite lightweight.
From the MQTT page:

    MQTT is a machine-to-machine (M2M)/”Internet of Things”
    connectivity protocol.  It was designed as an extremely lightweight
    publish/subscribe messaging transport.  It is useful for
    connections with remote locations where a small code footprint is
    required and/or network bandwidth is at a premium.  For example, it
    has been used in sensors communicating to a broker via satellite
    link, over occasional dial-up connections with healthcare
    providers, and in a range of home automation and small device
    scenarios.  It is also ideal for mobile applications because of its
    small size, low power usage, minimised data packets, and efficient
    distribution of information to one or many receivers
  On top of that I can see it being used when the needs themselves are
straightforward, which is often the case: just publish a payload to a
topic somewhere and avoid worrying with keeping up connection state,
error handling, etc.  Additionally it has been proposed as an OASIS
standard which is good news for those looking for a messaging (and M2M
in particular)

  There are many implementation options to choose from (see the MQTT
software page for more details), this example code is based on the
Eclipse Paho implementation.  See the full code on the github gist
(https://gist.github.com/fsmunoz/5844443#file-mqtt-clj), the following
is a snippet:

    ;; Simple example of MQTT message publish using Clojure
    ;;
    ;; Uses the Websphere Eclipse Paho client
    ;;
    ;; Author:   Frederico Munoz <[email protected]>
    ;; Date:     18-Jun-2013
    ;; Keywords: mqtt, messaging, m2m, telemetry, clojure, iot, paho
    ;;
    ;; Copying and distribution of this file, with or without modification,
    ;; are permitted in any medium without royalty provided the copyright
    ;; notice and this notice are preserved.  This file is offered as-is,
    ;; without any warranty.
    ;;
    ;; See http://www.eclipse.org/paho/ for the Eclipse Paho
    ;; implementation of MQTT, including the
    ;; org.eclipse.paho.client.mqttv3.jar JAR file which is used in this
    ;; example and should be in the classpath.
    ;;
    ;; Use of leiningen is recommended for any more advanced use, but
    ;; assuming all the jars are in the current directory the following
    ;; should work:
    ;;
    ;; $ java -cp clojure.jar:org.eclipse.paho.client.mqttv3.jar clojure.main mqtt.clj
    ;; Sat Jun 22 17:53:49 WEST 2013 : Sent
    ;; Connected to  tcp://m2m.eclipse.org:1883
    ;; *** PUBLISHING TO TOPIC  cljtest  ***
    ;; *** DELIVERY COMPLETE ***
    ;; $
    ;;
    ;; This code only shows how to publish, but there are several MQTT
    ;; clients that can be used to subscribe to the topic. The Mosquitto
    ;; MQTT implementation (http://mosquitto.org/) comes with a client
    ;; which can be used thus:
    ;;
    ;; $ mosquitto_sub -h m2m.eclipse.org -t "cljtest" -v
    ;; cljtest But at least, out of my bitterness at what I'll never be, There's the quick calligraphy of these lines, The broken archway to the Impossible.
    ;; ^C
    ;; $

    (import 'org.eclipse.paho.client.mqttv3.MqttCallback)
    (import 'org.eclipse.paho.client.mqttv3.MqttClient)
    (import 'org.eclipse.paho.client.mqttv3.MqttConnectOptions)
    (import 'org.eclipse.paho.client.mqttv3.MqttDeliveryToken)
    (import 'org.eclipse.paho.client.mqttv3.MqttException)
    (import 'org.eclipse.paho.client.mqttv3.MqttMessage)
    (import 'org.eclipse.paho.client.mqttv3.MqttTopic)

    ;; Main variables, most likely candidates for additional command line arguments
    ;; Change them according to your environment (the message can be supplied in the cli)
    (def broker-url "tcp://m2m.eclipse.org:1883") ; Eclipse sandbox server, see http://m2m.eclipse.org/sandbox.html for details on use
    (def mqtt-topic-name "cljtest")
    (def mqtt-message (if (seq (first *command-line-args*))   ; Message, either the first
                        (first *command-line-args*)           ; command line argument or a snippet from Tobacco Shop
                        "But at least, out of my bitterness at what I'll never be, There's the quick calligraphy of these lines, The broken archway to the Impossible."))
    (def client-id "FSM-Clj-Test")

    (defn- mqtt-callback
      "Function called after delivery confirmation"
      []
      (reify MqttCallback
        (connectionLost [_ cause]
          (println (.toString cause)))
        (messageArrived [_ topic message]
          (println "Topic: " (.getName topic))
          (println "Message: " (.getPayload message)))
        (deliveryComplete [_ token]
          (println "*** DELIVERY COMPLETE ***"))))

    (defn- mqtt-connect
      "Establishes a MQTT connection to TOPIC; returns the mqtt client object"
      [topic]
      (let [mqtt-conn-options (MqttConnectOptions.)
            mqtt-client (MqttClient. broker-url client-id)]
        (doto mqtt-conn-options
          (.setCleanSession 'true)
          (.setKeepAliveInterval 30))
        (println "Connected to " broker-url)
        (.setCallback mqtt-client (mqtt-callback))
        (.connect mqtt-client mqtt-conn-options)
        mqtt-client))

    (defn- mqtt-create-message
      "Creates a MQTT message from MESSAGE, a string"
      [message]
      (let [mqtt-message (MqttMessage. (.getBytes message))]
        (doto mqtt-message
          (.setQos  0)
          (.setRetained false))))

    (defn- mqtt-publish
      "Publishes MESSAGE to TOPIC"
      [topic message]
      (println "*** PUBLISHING TO TOPIC " (.toString topic) " ***")
      (.waitForCompletion (.publish topic message)))

    (defn testmq
      "Main demo function, creates the connection and sends the message"
      []
      (let [client (mqtt-connect mqtt-topic-name)
            topic (.getTopic client mqtt-topic-name)
            message (mqtt-create-message mqtt-message)]
        (mqtt-publish topic message)
        (Thread/sleep 100)
        (.disconnect client)
        (shutdown-agents)))

    ;; Make it all work
    (testmq)

    ;; End of file

  Usage is trivial, here's an example:

    $ java -cp ~/.m2/repository/org/clojure/clojure/1.5.1/clojure-1.5.1.jar:/tmp/dummy/org.eclipse.paho.client.mqttv3.jar clojure.main mqtt.clj
    Connected to tcp://m2m.eclipse.org:1883
    *** PUBLISHING TO TOPIC cljtest ***
    *** DELIVERY COMPLETE ***
    $

  ...  and seeing the publishing using the Mosquitto MQTT client to
subscribe to the topic:

    $ mosquitto_sub -h m2m.eclipse.org -t "cljtest" -v
    cljtest But at least, out of my bitterness at what I'll never be, There's the quick calligraphy of these lines, The broken archway to the Impossible.
    ^C
    $

  Yet again Clojure presents itself as a good way to take advantage of
the Java libs, even when one doesn't use most of other Clojure's
strengths.