blob: b591be073e09c202ff1eca58f93eb8e7098a7fa6 [file] [log] [blame]
\ MQTT Client
\ MQTT is a lightweight protocol for transmitting
\ sensor data and receiving control command over TCP.
\ It relies on a server program to collect data from
\ many sensors and distribute command to controls.
\ This implementation currently supports only QOS 0 -
\ one-shot, unacknowledged transmission. The message
\ types for supporting the QOS 1 and QOS 2 sequences
\ are defined, but the logic is stubbed-out. See
\ handle-pub{ack,rec,rel,comp}
\ Define these configuration parameters externally, e.g.:
\ : mqtt-server$ ( -- $ ) " 192.168.2.11" ;
\ : mqtt-client-id$ ( -- $ ) " Sonoff Switch Forth" ;
\ : mqtt-username$ ( -- $ ) " thisDevice" ;
\ : mqtt-password$ ( -- $ ) " 12344321" ;
\ 0 value mqtt-will-qos \ 0, 1, 2, 3
\ 0 value mqtt-will-retain \ 0 or 1
\ 0 value mqtt-clean-session
\ 0 value mqtt-keepalive
: mqtt-port$ " 1883" ;
0 value mqtt-fd
: mqtt-send ( adr len -- )
mqtt-fd tcp-write 0< abort" tcp-write-failed"
;
0 [if] \ esp8266 code
\ XXX change this to use mqtt-fd instead of rx-pcb
: mqtt-send ( adr len -- )
['] tcp-sent rx-pcb tcp-sent ( adr len )
rx-pcb tcp-write drop ( )
;
: mqtt-poll ( -- ) #100 ms ;
[then]
0 [if] \ ESP32
\ The server name or IP address is read at startup
\ time from the wifi-on file
: server$ " 192.168.2.11" ;
#256 constant /mqtt-buffer
defer handle-peer-data
/mqtt-buffer buffer: mqtt-buffer
: mqtt-poll ( fd -- )
>r
#50 ms
mqtt-buffer /mqtt-buffer r@ lwip-read ( count )
dup 0> if ( count )
mqtt-buffer swap r@ handle-peer-data
else ( count )
drop
then
r> drop
;
[then]
vocabulary mqtt-topics
\needs be-w@ : be-w@ ( adr -- w ) dup 1+ c@ swap c@ bwjoin ;
: mqtt-flags ( -- b )
0
mqtt-username$ nip if $80 or then
mqtt-password$ nip if $40 or then
mqtt-will$ nip nip nip if
$04 or
mqtt-will-qos 3 lshift or
mqtt-will-retain 5 lshift or
then
mqtt-clean-session if $02 or then
;
0 value mqtt-msg
: mqtt-release-msg ( -- )
mqtt-msg if
mqtt-msg here - allot
0 to mqtt-msg
then
;
: m, ( byte -- ) c, ;
: mw, ( short -- ) dup 8 rshift m, m, ;
: m-payload, ( adr len -- )
here over allot ( adr len dst-adr )
swap move ( )
;
: m$, ( adr len -- )
dup mw, ( adr len )
m-payload,
;
: ?m$, ( adr len -- ) dup if m$, else 2drop then ;
: mqtt{ ( control -- ) here to mqtt-msg m, 0 m, ;
: }mqtt ( -- adr len )
mqtt-msg here over - ( adr len )
2dup 2- swap 1+ c! ( adr len ) \ Set length field
mqtt-send
mqtt-release-msg ( )
;
false value mqtt-connack?
0 value mqtt-session-present
: mqtt-connect ( -- )
false to mqtt-connack?
0 to mqtt-session-present
$10 mqtt{
" MQTT" m$, \ Protocol Name
4 m, \ Protocol Level
mqtt-flags m,
mqtt-keepalive mw,
mqtt-client-id$ m$,
mqtt-will$ dup if ( msg$ topic$ )
m$, m$,
else
2drop 2drop
then
mqtt-username$ ?m$,
mqtt-password$ ?m$,
}mqtt
;
: mqtt-publish-qos0 ( payload$ topic$ dup retain -- )
$30 ( payload$ topic$ dup retain control )
swap 0<> 1 and or ( payload$ topic$ dup control )
swap 0<> 8 and or ( payload$ topic$ control )
mqtt{ ( payload$ topic$ )
m$, ( payload$ )
m-payload, ( )
}mqtt
;
\ There is no receiver response to a QOS 0 publish
: mqtt-publish-qos ( payload$ topic$ dup retain id qos -- )
swap >r ( payload$ topic$ dup retain qos r: id )
$30 ( payload$ topic$ dup retain id qos control )
swap 2 max 1 shift or ( payload$ topic$ dup retain id control r: id )
swap 0<> 1 and or ( payload$ topic$ dup control r: id )
swap 0<> 8 and or ( payload$ topic$ control r: id )
mqtt{ ( payload$ topic$ r: id )
m$, ( payload$ r: id )
r> mw, ( payload$ )
m-payload, ( )
}mqtt
;
\ Receiver responds with this for QOS 1
: mqtt-puback ( id ) $40 mqtt{ mw, }mqtt ;
\ Receiver responds with this for QOS 2
: mqtt-pubrec ( id ) $50 mqtt{ mw, }mqtt ;
\ Sender then sends this third stage for QOS2
: mqtt-pubrel ( id -- ) $60 mqtt{ mw, }mqtt ;
\ Receiver responds with this fourth and final stage for QOS2
\ PUBCOMP $70 $02 packet-id.w \ QOS 2
: mqtt-pubcomp ( id ) $70 mqtt{ mw, }mqtt ;
: mqtt-subscribe ( n*[ qos topic$ ] n id -- )
$82 mqtt{ ( n*[ qos topic$ ] n id )
mw, ( n*[ qos topic$ ] n )
0 do ( n*[ qos topic$ ] )
m$, m, ( m*[ qos topic$ ] )
loop ( )
}mqtt
;
: subscribe-all ( -- )
['] mqtt-topics follow
begin another? while
>name$
." Subscribing to " 2dup type cr
0 -rot 1 #1234 mqtt-subscribe
repeat
;
: mqtt-unsubscribe ( n*topic$ n id -- )
$a0 mqtt{ mw, ( n*topic$ n )
0 do m$, loop ( )
}mqtt
;
: mqtt-pingreq ( -- ) $c0 mqtt{ }mqtt ;
: mqtt-disconnect ( -- ) $e0 mqtt{ }mqtt ;
: handle-puback ( id -- ) ." Puback " .d cr ;
: handle-pubrec ( id -- ) ." Pubrec " .d cr ;
: handle-pubrel ( id -- ) ." Pubrel " .d cr ;
: handle-pubcomp ( id -- ) ." Pubcomp " .d cr ;
: handle-suback ( adr len -- )
dup 3 < if ( adr len )
." SUBACK short packet" cr exit
then ( adr len )
over be-w@ drop ( adr len )
2 /string ( adr' len' )
0 do ( adr )
dup i + c@ $80 and if ( adr )
." Subscription " i . ." failed" cr
then ( adr )
loop ( adr )
drop ( )
;
: handle-unsuback ( id -- ) drop ." Unsubscribed" cr ;
: handle-pingresp ( -- ) ." PONG" cr ;
: >payload&topic ( adr len -- payload$ topic$ )
over be-w@ >r ( adr len r: topic-len )
2 /string ( topic-adr totlen r: topic-len )
over r@ ( topic-adr totlen topic-adr topic-len r: topic-len)
2swap r> /string ( payload$ topic$ )
2swap ( payload$ topic$ )
;
: handle-publish ( adr len -- )
dup 2 < if ( adr len )
." Short MQTT PUBLISH" cr
2drop exit
then ( adr len )
>payload&topic ( payload$ topic$ )
2dup ['] mqtt-topics search-wordlist if ( payload$ topic$ xt )
nip nip execute ( )
else ( payload$ topic$ )
." No MQTT handler for topic " type cr ( payload$ )
2drop
then
;
: ?badlen ( len explen msg$ -- )
2swap <> if ( msg$ )
type ." bad message length" cr
else
2drop
then
;
\ Called when a TCP packet is received
: mqtt-handle-message ( adr len -- )
over c@ >r 2 /string r> ( adr' len' type )
4 rshift case ( adr len )
$2 of \ CONNACK $20 $02 sp ret
2 " CONNACK" ?badlen
dup c@ 1 and 0<> to mqtt-session-present
1+ c@ ?dup if ( error )
." MQTT session refused - "
case
1 of ." bad protocol version" endof
2 of ." identifier rejected" endof
3 of ." server unavailable" endof
4 of ." bad username/password" endof
5 of ." not authorized" endof
( default ) ." ?"
endcase
cr
exit
then
true to mqtt-connack?
endof
$3 of \ PUBLISH $30 len topic$ value$
( adr len ) handle-publish
endof
$4 of \ PUBACK $40 $02 packet-id.w
2 " PUBACK" ?badlen
be-w@ handle-puback
endof
$5 of \ PUBREC $50 $02 packet-id.w
2 " PUBREC" ?badlen
be-w@ handle-pubrec
endof
$6 of \ PUBREL $60 $02 packet-id.w
2 " PUBREL" ?badlen
be-w@ handle-pubrel
endof
$7 of \ PUBCOMP $70 $02 packet-id.w
2 " PUBCOMP" ?badlen
be-w@ handle-pubcomp
endof
$9 of \ SUBACK $90 len id.w (len-2)*[$80|QOS] $80 bit means fail - it is 0 for success
handle-suback
endof
$b of \ UNSUBACK $b0 02 id.w
2 " UNSUBACK" ?badlen
be-w@ handle-unsuback
endof
$d of \ PINGRESP $d0 00
2drop handle-pingresp
endof
( default ) nip nip
endcase
;
: mqtt-received ( adr len peer -- )
drop ( adr len )
begin dup 2 >= while ( adr len )
over 1+ c@ 2+ >r ( adr len r: thislen )
over r@ mqtt-handle-message ( adr len r: thislen )
r> /string ( adr' len' )
repeat ( adr' len' )
2drop
;
: mqtt-start ( -- )
['] mqtt-received to handle-peer-data
#50 mqtt-port$ mqtt-server$ stream-connect to mqtt-fd
mqtt-fd 0< abort" Failed to connect to MQTT server"
mqtt-connect
begin mqtt-fd do-tcp-poll mqtt-connack? until
;