Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import kotlinx.coroutines.*
- import org.eclipse.paho.client.mqttv3.*
- class MqttWrapper(
- private val brokerUrl: String,
- private val clientId: String,
- private val callbackScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
- ) {
- private var mqttClient: MqttClient? = null
- // Variable to hold the latest received MQTT message payload
- var latestMessage: String? = null
- private set
- // Callback to notify when a new message arrives
- var onMessageReceived: ((topic: String, message: String) -> Unit)? = null
- fun connect() {
- try {
- mqttClient = MqttClient(brokerUrl, clientId, null)
- val options = MqttConnectOptions()
- options.isCleanSession = true
- mqttClient?.setCallback(object : MqttCallback {
- override fun connectionLost(cause: Throwable?) {
- // Handle loss of connection if needed
- }
- override fun messageArrived(topic: String?, message: MqttMessage?) {
- val msgStr = message?.toString() ?: ""
- latestMessage = msgStr
- // Invoke the callback on the main thread
- callbackScope.launch {
- onMessageReceived?.invoke(topic ?: "", msgStr)
- }
- }
- override fun deliveryComplete(token: IMqttDeliveryToken?) {
- // Optionally handle delivery complete
- }
- })
- mqttClient?.connect(options)
- } catch (e: MqttException) {
- e.printStackTrace()
- }
- }
- fun subscribe(topic: String, qos: Int = 1) {
- try {
- mqttClient?.subscribe(topic, qos)
- } catch (e: MqttException) {
- e.printStackTrace()
- }
- }
- fun publish(topic: String, message: String, qos: Int = 1) {
- try {
- val mqttMessage = MqttMessage(message.toByteArray())
- mqttMessage.qos = qos
- mqttClient?.publish(topic, mqttMessage)
- } catch (e: MqttException) {
- e.printStackTrace()
- }
- }
- fun disconnect() {
- try {
- mqttClient?.disconnect()
- } catch (e: MqttException) {
- e.printStackTrace()
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement