|
| 1 | +// SPDX-License-Identifier: MPL-2.0 |
| 2 | +// FlowSDK QUIC MQTT client example (macOS / Linux) |
| 3 | +// |
| 4 | +// Usage: |
| 5 | +// LIBRARY_PATH=swift/lib swift run QuicClientExample [host] [port] |
| 6 | +// |
| 7 | +// For Wireshark key logging: |
| 8 | +// SSLKEYLOGFILE=~/tmp/sslkeylog.txt LIBRARY_PATH=swift/lib swift run QuicClientExample |
| 9 | +// |
| 10 | +// Default broker: broker.emqx.io:14567 |
| 11 | + |
| 12 | +import Foundation |
| 13 | +import FlowSDK |
| 14 | + |
| 15 | +#if canImport(Darwin) |
| 16 | +import Darwin |
| 17 | +#elseif canImport(Glibc) |
| 18 | +import Glibc |
| 19 | +#endif |
| 20 | + |
| 21 | +// MARK: - Configuration |
| 22 | + |
| 23 | +private let brokerHost = CommandLine.arguments.count > 1 ? CommandLine.arguments[1] : "broker.emqx.io" |
| 24 | +private let brokerPort = CommandLine.arguments.count > 2 ? Int(CommandLine.arguments[2])! : 14567 |
| 25 | +private let runDurationMs: UInt64 = 10_000 |
| 26 | +private let tickIntervalMs: UInt64 = 10 |
| 27 | +private let recvBufSize = 65536 |
| 28 | + |
| 29 | +// MARK: - POSIX UDP helpers |
| 30 | + |
| 31 | +private func resolveHostAddr(_ host: String, port: Int) -> sockaddr_in { |
| 32 | + var hints = addrinfo() |
| 33 | + hints.ai_family = AF_INET |
| 34 | + hints.ai_socktype = SOCK_DGRAM |
| 35 | + var res: UnsafeMutablePointer<addrinfo>? |
| 36 | + guard getaddrinfo(host, String(port), &hints, &res) == 0, let ai = res else { |
| 37 | + fatalError("getaddrinfo failed for \(host):\(port)") |
| 38 | + } |
| 39 | + defer { freeaddrinfo(res) } |
| 40 | + var addr = sockaddr_in() |
| 41 | + withUnsafeMutableBytes(of: &addr) { |
| 42 | + $0.copyMemory(from: UnsafeRawBufferPointer(start: ai.pointee.ai_addr, |
| 43 | + count: Int(ai.pointee.ai_addrlen))) |
| 44 | + } |
| 45 | + return addr |
| 46 | +} |
| 47 | + |
| 48 | +private func makeNonBlockingUdpSocket() -> Int32 { |
| 49 | + let fd = socket(AF_INET, SOCK_DGRAM, 0) |
| 50 | + guard fd >= 0 else { fatalError("socket() failed") } |
| 51 | + let flags = fcntl(fd, F_GETFL) |
| 52 | + _ = fcntl(fd, F_SETFL, flags | O_NONBLOCK) |
| 53 | + return fd |
| 54 | +} |
| 55 | + |
| 56 | +private func sendDatagram(_ fd: Int32, data: Data, to addr: inout sockaddr_in) { |
| 57 | + guard !data.isEmpty else { return } |
| 58 | + data.withUnsafeBytes { ptr in |
| 59 | + withUnsafeBytes(of: &addr) { addrPtr in |
| 60 | + _ = sendto(fd, ptr.baseAddress!, data.count, 0, |
| 61 | + addrPtr.baseAddress!.assumingMemoryBound(to: sockaddr.self), |
| 62 | + socklen_t(MemoryLayout<sockaddr_in>.size)) |
| 63 | + } |
| 64 | + } |
| 65 | +} |
| 66 | + |
| 67 | +private func recvDatagram(_ fd: Int32, buf: inout [UInt8]) -> Data? { |
| 68 | + var src = sockaddr_in() |
| 69 | + var srcLen = socklen_t(MemoryLayout<sockaddr_in>.size) |
| 70 | + let n = withUnsafeMutableBytes(of: &src) { srcPtr in |
| 71 | + recvfrom(fd, &buf, buf.count, 0, |
| 72 | + srcPtr.baseAddress!.assumingMemoryBound(to: sockaddr.self), |
| 73 | + &srcLen) |
| 74 | + } |
| 75 | + guard n > 0 else { return nil } |
| 76 | + return Data(buf[0..<n]) |
| 77 | +} |
| 78 | + |
| 79 | +private func waitReadable(_ fd: Int32, timeoutMs: Int) -> Bool { |
| 80 | + var pfd = pollfd(fd: fd, events: Int16(POLLIN), revents: 0) |
| 81 | + return poll(&pfd, 1, Int32(timeoutMs)) > 0 |
| 82 | +} |
| 83 | + |
| 84 | +// MARK: - Engine helpers |
| 85 | + |
| 86 | +func nowMs(since startMs: UInt64) -> UInt64 { |
| 87 | + return UInt64(Date().timeIntervalSince1970 * 1000) - startMs |
| 88 | +} |
| 89 | + |
| 90 | +/// Addr string expected by the FFI engine: "1.2.3.4:14567" |
| 91 | +func addrString(addr: sockaddr_in, port: Int) -> String { |
| 92 | + var a = addr.sin_addr |
| 93 | + var buf = [CChar](repeating: 0, count: Int(INET_ADDRSTRLEN)) |
| 94 | + inet_ntop(AF_INET, &a, &buf, socklen_t(INET_ADDRSTRLEN)) |
| 95 | + return "\(String(cString: buf)):\(port)" |
| 96 | +} |
| 97 | + |
| 98 | +func sendOutgoing(_ engine: QuicMqttEngineFfi, fd: Int32, to addr: inout sockaddr_in) { |
| 99 | + for dgram in engine.takeOutgoingDatagrams() { |
| 100 | + sendDatagram(fd, data: dgram.data, to: &addr) |
| 101 | + } |
| 102 | +} |
| 103 | + |
| 104 | +// MARK: - Entry point |
| 105 | + |
| 106 | +print("Initializing FlowSDK QUIC Swift Example...") |
| 107 | + |
| 108 | +let opts = MqttOptionsFfi( |
| 109 | + clientId: "swift_quic_\(Int(Date().timeIntervalSince1970) % 100_000)", |
| 110 | + mqttVersion: 5, |
| 111 | + cleanStart: true, |
| 112 | + keepAlive: 30, // Must match QUIC idle timeout (30 s) |
| 113 | + username: nil, |
| 114 | + password: nil, |
| 115 | + reconnectBaseDelayMs: 1_000, |
| 116 | + reconnectMaxDelayMs: 10_000, |
| 117 | + maxReconnectAttempts: 3 |
| 118 | +) |
| 119 | + |
| 120 | +let engine = QuicMqttEngineFfi(opts: opts) |
| 121 | +print("QUIC Engine created.") |
| 122 | + |
| 123 | +// Enable TLS key logging when SSLKEYLOGFILE is set (for Wireshark) |
| 124 | +let enableKeyLog = ProcessInfo.processInfo.environment["SSLKEYLOGFILE"] != nil |
| 125 | +let tlsOpts = MqttTlsOptionsFfi( |
| 126 | + caCertFile: nil, |
| 127 | + clientCertFile: nil, |
| 128 | + clientKeyFile: nil, |
| 129 | + insecureSkipVerify: true, // Demo broker only — do not use in production |
| 130 | + alpnProtocols: [], |
| 131 | + enableKeyLog: enableKeyLog |
| 132 | +) |
| 133 | + |
| 134 | +// Open non-blocking UDP socket and resolve broker |
| 135 | +var brokerAddr = resolveHostAddr(brokerHost, port: brokerPort) |
| 136 | +let fd = makeNonBlockingUdpSocket() |
| 137 | +let serverAddrStr = addrString(addr: brokerAddr, port: brokerPort) |
| 138 | + |
| 139 | +// Track relative time from engine creation (required by tick API) |
| 140 | +let engineStartMs = UInt64(Date().timeIntervalSince1970 * 1000) |
| 141 | + |
| 142 | +print("Connecting to QUIC broker at \(serverAddrStr) (host: \(brokerHost))...") |
| 143 | +engine.connect(serverAddr: serverAddrStr, serverName: brokerHost, |
| 144 | + tlsOpts: tlsOpts, nowMs: nowMs(since: engineStartMs)) |
| 145 | +// Tick immediately to generate initial QUIC handshake packets |
| 146 | +_ = engine.handleTick(nowMs: nowMs(since: engineStartMs)) |
| 147 | +sendOutgoing(engine, fd: fd, to: &brokerAddr) |
| 148 | + |
| 149 | +// Main event loop |
| 150 | +var recvBuf = [UInt8](repeating: 0, count: recvBufSize) |
| 151 | +var subscribed = false |
| 152 | +var published = false |
| 153 | + |
| 154 | +while nowMs(since: engineStartMs) < runDurationMs { |
| 155 | + // Drain all received datagrams |
| 156 | + if waitReadable(fd, timeoutMs: Int(tickIntervalMs)) { |
| 157 | + while let data = recvDatagram(fd, buf: &recvBuf) { |
| 158 | + engine.handleDatagram(data: data, remoteAddr: serverAddrStr, |
| 159 | + nowMs: nowMs(since: engineStartMs)) |
| 160 | + } |
| 161 | + } |
| 162 | + |
| 163 | + // Tick the engine (drives QUIC timers + MQTT keepalive) |
| 164 | + let events = engine.handleTick(nowMs: nowMs(since: engineStartMs)) |
| 165 | + |
| 166 | + for event in events { |
| 167 | + switch event { |
| 168 | + case .connected(let r): |
| 169 | + print("Connected! sessionPresent=\(r.sessionPresent)") |
| 170 | + if !subscribed { |
| 171 | + let pid = engine.subscribe(topicFilter: "test/swift/quic", qos: 1) |
| 172 | + print("Subscribed to 'test/swift/quic' (PID \(pid))") |
| 173 | + subscribed = true |
| 174 | + sendOutgoing(engine, fd: fd, to: &brokerAddr) |
| 175 | + } |
| 176 | + case .subscribed(let r): |
| 177 | + print("Subscribe ack PID \(r.packetId)") |
| 178 | + if !published { |
| 179 | + let payload = Data("Hello from Swift QUIC!".utf8) |
| 180 | + let pid = engine.publish(topic: "test/swift/quic", payload: payload, qos: 1) |
| 181 | + print("Published to 'test/swift/quic' (PID \(pid))") |
| 182 | + published = true |
| 183 | + // Tick immediately so QUIC frames are generated before the next sendOutgoing |
| 184 | + _ = engine.handleTick(nowMs: nowMs(since: engineStartMs)) |
| 185 | + sendOutgoing(engine, fd: fd, to: &brokerAddr) |
| 186 | + } |
| 187 | + case .messageReceived(let m): |
| 188 | + let msg = String(data: m.payload, encoding: .utf8) ?? "<binary>" |
| 189 | + print("✅ Message on '\(m.topic)': \(msg)") |
| 190 | + case .published(let r): |
| 191 | + print("✅ Publish ack PID \(r.packetId.map(String.init) ?? "none")") |
| 192 | + case .disconnected(let reasonCode): |
| 193 | + print("⚠️ Disconnected. reasonCode=\(String(describing: reasonCode))") |
| 194 | + case .error(let message): |
| 195 | + print("❌ Error: \(message)") |
| 196 | + case .reconnectNeeded: |
| 197 | + print("ℹ Reconnect needed") |
| 198 | + case .reconnectScheduled(let attempt, let delayMs): |
| 199 | + print("ℹ Reconnect attempt \(attempt) in \(delayMs)ms") |
| 200 | + case .pingResponse(let success): |
| 201 | + print("ℹ Ping response: success=\(success)") |
| 202 | + case .unsubscribed(_): |
| 203 | + break |
| 204 | + } |
| 205 | + } |
| 206 | + |
| 207 | + // Forward engine-generated outgoing datagrams |
| 208 | + sendOutgoing(engine, fd: fd, to: &brokerAddr) |
| 209 | +} |
| 210 | + |
| 211 | +// Graceful disconnect |
| 212 | +print("Run time elapsed, disconnecting...") |
| 213 | +engine.disconnect() |
| 214 | +sendOutgoing(engine, fd: fd, to: &brokerAddr) |
| 215 | +Darwin.close(fd) |
| 216 | +print("Done.") |
0 commit comments