From 5208f64ab6313e1abf4cc87b685dff9ebd1538e9 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Sat, 23 May 2026 15:29:03 +1200 Subject: [PATCH 1/3] feat(Push): add Appwrite Push (MQTT 5) adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a self-hosted, low-power alternative to FCM/APNS. Publishes notifications over MQTT 5 to a per-device topic, allowing a single persistent TLS connection on the device with a long keep-alive interval (30 minutes by default) — the same model that lets FCM be low-power on Android. - Helpers/MQTT: minimal MQTT 5 codec (control packet encode/decode) so the adapter does not need an external MQTT client dependency. - Adapter/Push/Appwrite: publisher adapter. Connects over TCP/TLS, authenticates with a short-lived HMAC-signed JWT, publishes one QoS 1 PUBLISH per device with content-type and message-expiry properties, maps broker reason codes back to the standard expired-token signal so Appwrite's target invalidation works the same way as for FCM/APNS. Tests: 10 codec round-trip cases, 2 adapter integration cases driven by a fake broker spawned via proc_open. PHPStan level 6 clean, Pint PSR-12 clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../Messaging/Adapter/Push/Appwrite.php | 379 +++++++++++ src/Utopia/Messaging/Helpers/MQTT.php | 618 ++++++++++++++++++ tests/Messaging/Adapter/Push/AppwriteTest.php | 185 ++++++ tests/Messaging/Adapter/Push/FakeBroker.php | 109 +++ tests/Messaging/Helpers/MQTTTest.php | 171 +++++ 5 files changed, 1462 insertions(+) create mode 100644 src/Utopia/Messaging/Adapter/Push/Appwrite.php create mode 100644 src/Utopia/Messaging/Helpers/MQTT.php create mode 100644 tests/Messaging/Adapter/Push/AppwriteTest.php create mode 100644 tests/Messaging/Adapter/Push/FakeBroker.php create mode 100644 tests/Messaging/Helpers/MQTTTest.php diff --git a/src/Utopia/Messaging/Adapter/Push/Appwrite.php b/src/Utopia/Messaging/Adapter/Push/Appwrite.php new file mode 100644 index 00000000..a3f1b543 --- /dev/null +++ b/src/Utopia/Messaging/Adapter/Push/Appwrite.php @@ -0,0 +1,379 @@ +serverId === '') { + $this->serverId = self::SERVER_CLIENT_PREFIX . '-' . \bin2hex(\random_bytes(6)); + } + } + + public function getName(): string + { + return static::NAME; + } + + public function getMaxMessagesPerRequest(): int + { + return 5000; + } + + /** + * {@inheritdoc} + */ + protected function process(PushMessage $message): array + { + $payload = $this->buildPayload($message); + $expiry = $this->resolveExpiry($message); + + $response = new Response($this->getType()); + + $socket = $this->connect(); + + try { + $this->handshake($socket); + + foreach ($message->getTo() as $token) { + $topic = $this->topicForToken($token); + + try { + $packetId = $this->nextPacketId(); + $packet = MQTT::encodePublish( + topic: $topic, + payload: $payload, + qos: 1, + retain: false, + dup: false, + packetId: $packetId, + properties: [ + 'messageExpiryInterval' => $expiry, + 'contentType' => 'application/json', + ], + ); + + $this->write($socket, $packet); + $ack = $this->readPacket($socket); + if ($ack['type'] !== MQTT::PACKET_PUBACK) { + $response->addResult($token, 'Broker did not acknowledge PUBLISH'); + continue; + } + + $parsed = MQTT::parsePuback($ack['payload']); + if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) { + $error = $this->errorForReasonCode($parsed['reasonCode']); + $response->addResult($token, $error); + continue; + } + + $response->incrementDeliveredTo(); + $response->addResult($token); + } catch (\Throwable $error) { + $response->addResult($token, $error->getMessage()); + } + } + + try { + $this->write($socket, MQTT::encodeDisconnect()); + } catch (\Throwable) { + // Best effort; some brokers may have already closed the socket. + } + } finally { + $this->close($socket); + } + + return $response->toArray(); + } + + /** + * Build a single payload that the device runtime can render. Mirrors the + * shape exposed to FCM/APNS so SDK consumers see a consistent envelope. + * + * @return string JSON-encoded payload + */ + private function buildPayload(PushMessage $message): string + { + $envelope = []; + + if ($message->getTitle() !== null) { + $envelope['notification']['title'] = $message->getTitle(); + } + if ($message->getBody() !== null) { + $envelope['notification']['body'] = $message->getBody(); + } + if ($message->getImage() !== null) { + $envelope['notification']['image'] = $message->getImage(); + } + if ($message->getIcon() !== null) { + $envelope['notification']['icon'] = $message->getIcon(); + } + if ($message->getColor() !== null) { + $envelope['notification']['color'] = $message->getColor(); + } + if ($message->getSound() !== null) { + $envelope['notification']['sound'] = $message->getSound(); + } + if ($message->getTag() !== null) { + $envelope['notification']['tag'] = $message->getTag(); + } + if ($message->getBadge() !== null) { + $envelope['notification']['badge'] = $message->getBadge(); + } + if ($message->getAction() !== null) { + $envelope['notification']['action'] = $message->getAction(); + } + if ($message->getContentAvailable() !== null) { + $envelope['notification']['contentAvailable'] = (bool)$message->getContentAvailable(); + } + if ($message->getCritical() !== null) { + $envelope['notification']['critical'] = (bool)$message->getCritical(); + } + if ($message->getData() !== null) { + $envelope['data'] = $message->getData(); + } + if ($message->getPriority() !== null) { + $envelope['priority'] = match ($message->getPriority()) { + Priority::HIGH => 'high', + Priority::NORMAL => 'normal', + }; + } + + return \json_encode($envelope, JSON_UNESCAPED_SLASHES); + } + + private function resolveExpiry(PushMessage $message): int + { + if (\method_exists($message, 'getMessageExpiry')) { + $expiry = $message->getMessageExpiry(); + if (\is_int($expiry) && $expiry > 0) { + return $expiry; + } + } + + return $this->messageExpiry; + } + + private function topicForToken(string $token): string + { + return self::TOPIC_PREFIX . '/' . $token; + } + + private function nextPacketId(): int + { + $this->packetId = ($this->packetId + 1) & 0xFFFF; + if ($this->packetId === 0) { + $this->packetId = 1; + } + + return $this->packetId; + } + + /** + * @return resource + */ + private function connect() + { + $url = $this->resolveEndpoint(); + $context = \stream_context_create([ + 'ssl' => [ + 'verify_peer' => true, + 'verify_peer_name' => true, + 'SNI_enabled' => true, + ], + ]); + + $socket = @\stream_socket_client( + $url, + $errno, + $errstr, + self::CONNECT_TIMEOUT, + STREAM_CLIENT_CONNECT, + $context, + ); + + if (!$socket) { + throw new \RuntimeException("Unable to connect to Appwrite Push broker at {$url}: {$errstr} (errno {$errno})"); + } + + \stream_set_timeout($socket, self::READ_TIMEOUT); + + return $socket; + } + + private function resolveEndpoint(): string + { + $endpoint = \rtrim($this->endpoint); + if ($endpoint === '') { + throw new \RuntimeException('Appwrite Push endpoint is not configured'); + } + + $scheme = $this->tls ? 'tls' : 'tcp'; + + if (\str_contains($endpoint, '://')) { + $parts = \parse_url($endpoint); + $host = $parts['host'] ?? ''; + $port = $parts['port'] ?? ($this->tls ? 8883 : 1883); + + return "{$scheme}://{$host}:{$port}"; + } + + if (\str_contains($endpoint, ':')) { + return "{$scheme}://{$endpoint}"; + } + + $port = $this->tls ? 8883 : 1883; + + return "{$scheme}://{$endpoint}:{$port}"; + } + + /** + * @param resource $socket + */ + private function handshake($socket): void + { + $token = $this->issueServerJwt(); + $packet = MQTT::encodeConnect( + clientId: $this->serverId, + username: 'server', + password: $token, + keepAlive: self::KEEP_ALIVE, + cleanStart: true, + properties: [ + 'sessionExpiryInterval' => 0, + ], + ); + + $this->write($socket, $packet); + + $response = $this->readPacket($socket); + if ($response['type'] !== MQTT::PACKET_CONNACK) { + throw new \RuntimeException('Broker did not respond with CONNACK'); + } + + $connack = MQTT::parseConnack($response['payload']); + if ($connack['reasonCode'] !== MQTT::REASON_SUCCESS) { + throw new \RuntimeException("Broker rejected CONNECT (reason {$connack['reasonCode']})"); + } + } + + private function issueServerJwt(): string + { + $now = \time(); + $claims = [ + 'iss' => 'appwrite', + 'sub' => $this->serverId, + 'iat' => $now, + 'exp' => $now + self::JWT_TTL, + 'scope' => self::JWT_SCOPE, + ]; + + return JWT::encode($claims, $this->signingKey, self::JWT_ALGORITHM); + } + + /** + * @param resource $socket + * @return array{type: int, flags: int, payload: string} + */ + private function readPacket($socket): array + { + $buffer = ''; + while (true) { + $chunk = @\fread($socket, 4096); + if ($chunk === false || $chunk === '') { + if (\feof($socket)) { + throw new \RuntimeException('Broker closed the connection'); + } + + $info = \stream_get_meta_data($socket); + if (!empty($info['timed_out'])) { + throw new \RuntimeException('Broker read timeout'); + } + + continue; + } + + $buffer .= $chunk; + + $packet = MQTT::decodePacket($buffer); + if ($packet !== null) { + return $packet; + } + } + } + + /** + * @param resource $socket + */ + private function write($socket, string $bytes): void + { + $length = \strlen($bytes); + $written = 0; + + while ($written < $length) { + $result = @\fwrite($socket, \substr($bytes, $written)); + if ($result === false || $result === 0) { + throw new \RuntimeException('Failed to write to broker socket'); + } + $written += $result; + } + } + + /** + * @param resource $socket + */ + private function close($socket): void + { + if (\is_resource($socket)) { + @\fclose($socket); + } + } + + private function errorForReasonCode(int $code): string + { + return match ($code) { + 0x10 => $this->getExpiredErrorMessage(), // No matching subscribers + 0x90 => 'Topic name invalid', + 0x97 => 'Quota exceeded', + 0x99 => 'Payload format invalid', + 0x87 => 'Not authorized', + default => "Broker returned reason code 0x" . \dechex($code), + }; + } +} diff --git a/src/Utopia/Messaging/Helpers/MQTT.php b/src/Utopia/Messaging/Helpers/MQTT.php new file mode 100644 index 00000000..4442d162 --- /dev/null +++ b/src/Utopia/Messaging/Helpers/MQTT.php @@ -0,0 +1,618 @@ + $properties Extra connection properties (session expiry, etc.). + */ + public static function encodeConnect( + string $clientId, + ?string $username = null, + ?string $password = null, + int $keepAlive = 60, + bool $cleanStart = true, + array $properties = [] + ): string { + $variable = self::encodeString(self::PROTOCOL_NAME); + $variable .= \chr(self::PROTOCOL_VERSION); + + $flags = 0; + if ($cleanStart) { + $flags |= 0x02; + } + if ($password !== null) { + $flags |= 0x40; + } + if ($username !== null) { + $flags |= 0x80; + } + + $variable .= \chr($flags); + $variable .= \pack('n', $keepAlive); + + $props = ''; + if (isset($properties['sessionExpiryInterval'])) { + $props .= \chr(self::PROPERTY_SESSION_EXPIRY) . \pack('N', (int)$properties['sessionExpiryInterval']); + } + if (isset($properties['authenticationMethod'])) { + $props .= \chr(self::PROPERTY_AUTHENTICATION_METHOD) . self::encodeString((string)$properties['authenticationMethod']); + } + if (isset($properties['authenticationData'])) { + $props .= \chr(self::PROPERTY_AUTHENTICATION_DATA) . self::encodeBinary((string)$properties['authenticationData']); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + $payload = self::encodeString($clientId); + if ($username !== null) { + $payload .= self::encodeString($username); + } + if ($password !== null) { + $payload .= self::encodeBinary($password); + } + + return self::buildPacket(self::PACKET_CONNECT, 0, $variable . $payload); + } + + /** + * Encode a CONNACK packet. + * + * @param array $properties Optional server-keepalive/assigned-client-id etc. + */ + public static function encodeConnack(int $reasonCode, bool $sessionPresent = false, array $properties = []): string + { + $variable = \chr($sessionPresent ? 0x01 : 0x00); + $variable .= \chr($reasonCode); + + $props = ''; + if (isset($properties['serverKeepAlive'])) { + $props .= \chr(self::PROPERTY_SERVER_KEEP_ALIVE) . \pack('n', (int)$properties['serverKeepAlive']); + } + if (isset($properties['assignedClientId'])) { + $props .= \chr(self::PROPERTY_ASSIGNED_CLIENT_ID) . self::encodeString((string)$properties['assignedClientId']); + } + if (isset($properties['reasonString'])) { + $props .= \chr(self::PROPERTY_REASON_STRING) . self::encodeString((string)$properties['reasonString']); + } + if (isset($properties['maximumQoS'])) { + $props .= \chr(self::PROPERTY_MAXIMUM_QOS) . \chr((int)$properties['maximumQoS']); + } + if (isset($properties['retainAvailable'])) { + $props .= \chr(self::PROPERTY_RETAIN_AVAILABLE) . \chr($properties['retainAvailable'] ? 1 : 0); + } + if (isset($properties['receiveMaximum'])) { + $props .= \chr(self::PROPERTY_RECEIVE_MAXIMUM) . \pack('n', (int)$properties['receiveMaximum']); + } + if (isset($properties['wildcardSubscriptionAvailable'])) { + $props .= \chr(self::PROPERTY_WILDCARD_SUBSCRIPTION_AVAILABLE) . \chr($properties['wildcardSubscriptionAvailable'] ? 1 : 0); + } + if (isset($properties['sharedSubscriptionAvailable'])) { + $props .= \chr(self::PROPERTY_SHARED_SUBSCRIPTION_AVAILABLE) . \chr($properties['sharedSubscriptionAvailable'] ? 1 : 0); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + return self::buildPacket(self::PACKET_CONNACK, 0, $variable); + } + + /** + * Encode a PUBLISH packet. + * + * @param array $properties Optional message expiry, content type, etc. + */ + public static function encodePublish( + string $topic, + string $payload, + int $qos = 0, + bool $retain = false, + bool $dup = false, + ?int $packetId = null, + array $properties = [] + ): string { + $flags = 0; + if ($dup) { + $flags |= 0x08; + } + $flags |= ($qos & 0x03) << 1; + if ($retain) { + $flags |= 0x01; + } + + $variable = self::encodeString($topic); + if ($qos > 0) { + if ($packetId === null) { + throw new \InvalidArgumentException('packetId is required for QoS > 0'); + } + $variable .= \pack('n', $packetId); + } + + $props = ''; + if (isset($properties['messageExpiryInterval'])) { + $props .= \chr(self::PROPERTY_MESSAGE_EXPIRY) . \pack('N', (int)$properties['messageExpiryInterval']); + } + if (isset($properties['contentType'])) { + $props .= \chr(self::PROPERTY_CONTENT_TYPE) . self::encodeString((string)$properties['contentType']); + } + if (isset($properties['correlationData'])) { + $props .= \chr(self::PROPERTY_CORRELATION_DATA) . self::encodeBinary((string)$properties['correlationData']); + } + if (isset($properties['responseTopic'])) { + $props .= \chr(self::PROPERTY_RESPONSE_TOPIC) . self::encodeString((string)$properties['responseTopic']); + } + foreach ($properties['userProperties'] ?? [] as $key => $value) { + $props .= \chr(self::PROPERTY_USER_PROPERTY) . self::encodeString((string)$key) . self::encodeString((string)$value); + } + + $variable .= self::encodeVariableByteInteger(\strlen($props)) . $props; + + return self::buildPacket(self::PACKET_PUBLISH, $flags, $variable . $payload); + } + + /** + * Encode a PUBACK packet. + */ + public static function encodePuback(int $packetId, int $reasonCode = self::REASON_SUCCESS): string + { + $variable = \pack('n', $packetId); + $variable .= \chr($reasonCode); + $variable .= \chr(0); + + return self::buildPacket(self::PACKET_PUBACK, 0, $variable); + } + + /** + * Encode a SUBACK packet. + * + * @param array $reasonCodes One reason code per topic filter in the SUBSCRIBE. + */ + public static function encodeSuback(int $packetId, array $reasonCodes): string + { + $variable = \pack('n', $packetId); + $variable .= \chr(0); + foreach ($reasonCodes as $code) { + $variable .= \chr($code); + } + + return self::buildPacket(self::PACKET_SUBACK, 0, $variable); + } + + /** + * Encode a PINGRESP packet. + */ + public static function encodePingresp(): string + { + return self::buildPacket(self::PACKET_PINGRESP, 0, ''); + } + + /** + * Encode a PINGREQ packet. + */ + public static function encodePingreq(): string + { + return self::buildPacket(self::PACKET_PINGREQ, 0, ''); + } + + /** + * Encode a DISCONNECT packet. + */ + public static function encodeDisconnect(int $reasonCode = self::REASON_SUCCESS): string + { + if ($reasonCode === self::REASON_SUCCESS) { + return self::buildPacket(self::PACKET_DISCONNECT, 0, ''); + } + + $variable = \chr($reasonCode) . \chr(0); + + return self::buildPacket(self::PACKET_DISCONNECT, 0, $variable); + } + + /** + * Decode a single MQTT control packet from a buffer. + * + * Returns null if the buffer does not yet contain a full packet. On success + * advances the &$buffer past the consumed bytes and returns the parsed packet. + * + * @return array{type: int, flags: int, payload: string}|null + */ + public static function decodePacket(string &$buffer): ?array + { + $length = \strlen($buffer); + if ($length < 2) { + return null; + } + + $firstByte = \ord($buffer[0]); + $type = ($firstByte >> 4) & 0x0F; + $flags = $firstByte & 0x0F; + + $offset = 1; + $remaining = self::readVariableByteInteger($buffer, $offset); + if ($remaining === null) { + return null; + } + + $total = $offset + $remaining; + if ($length < $total) { + return null; + } + + $payload = \substr($buffer, $offset, $remaining); + $buffer = \substr($buffer, $total); + + return [ + 'type' => $type, + 'flags' => $flags, + 'payload' => $payload, + ]; + } + + /** + * Parse a CONNECT packet body (the bytes after the fixed header). + * + * @return array{ + * protocol: string, + * version: int, + * flags: int, + * keepAlive: int, + * clientId: string, + * username: ?string, + * password: ?string, + * properties: array, + * cleanStart: bool + * } + */ + public static function parseConnect(string $payload): array + { + $offset = 0; + $protocol = self::readString($payload, $offset); + $version = \ord($payload[$offset++]); + $flags = \ord($payload[$offset++]); + $keepAlive = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + $offset += $propLen; + + $clientId = self::readString($payload, $offset); + + $username = null; + $password = null; + + if ($flags & 0x80) { + $username = self::readString($payload, $offset); + } + if ($flags & 0x40) { + $password = self::readBinary($payload, $offset); + } + + return [ + 'protocol' => $protocol, + 'version' => $version, + 'flags' => $flags, + 'cleanStart' => (bool)($flags & 0x02), + 'keepAlive' => $keepAlive, + 'clientId' => $clientId, + 'username' => $username, + 'password' => $password, + 'properties' => $props, + ]; + } + + /** + * Parse a PUBLISH packet body. $flags is the lower nibble of the fixed header. + * + * @return array{ + * topic: string, + * payload: string, + * qos: int, + * retain: bool, + * dup: bool, + * packetId: ?int, + * properties: array + * } + */ + public static function parsePublish(string $payload, int $flags): array + { + $qos = ($flags >> 1) & 0x03; + $retain = (bool)($flags & 0x01); + $dup = (bool)($flags & 0x08); + + $offset = 0; + $topic = self::readString($payload, $offset); + + $packetId = null; + if ($qos > 0) { + $packetId = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + } + + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + $offset += $propLen; + + return [ + 'topic' => $topic, + 'payload' => \substr($payload, $offset), + 'qos' => $qos, + 'retain' => $retain, + 'dup' => $dup, + 'packetId' => $packetId, + 'properties' => $props, + ]; + } + + /** + * Parse a SUBSCRIBE packet body. + * + * @return array{ + * packetId: int, + * filters: array + * } + */ + public static function parseSubscribe(string $payload): array + { + $offset = 0; + $packetId = \unpack('n', \substr($payload, $offset, 2))[1]; + $offset += 2; + + $propLen = self::readVariableByteInteger($payload, $offset); + $offset += $propLen; + + $filters = []; + while ($offset < \strlen($payload)) { + $topic = self::readString($payload, $offset); + $options = \ord($payload[$offset++]); + $filters[] = [ + 'topic' => $topic, + 'qos' => $options & 0x03, + 'noLocal' => (bool)($options & 0x04), + 'retainAsPublished' => (bool)($options & 0x08), + 'retainHandling' => ($options >> 4) & 0x03, + ]; + } + + return [ + 'packetId' => $packetId, + 'filters' => $filters, + ]; + } + + /** + * Parse a CONNACK packet body. + * + * @return array{sessionPresent: bool, reasonCode: int, properties: array} + */ + public static function parseConnack(string $payload): array + { + $sessionPresent = (bool)(\ord($payload[0]) & 0x01); + $reasonCode = \ord($payload[1]); + $offset = 2; + $propLen = self::readVariableByteInteger($payload, $offset); + $props = self::readProperties(\substr($payload, $offset, $propLen)); + + return [ + 'sessionPresent' => $sessionPresent, + 'reasonCode' => $reasonCode, + 'properties' => $props, + ]; + } + + /** + * Parse a PUBACK packet body. + * + * @return array{packetId: int, reasonCode: int} + */ + public static function parsePuback(string $payload): array + { + $packetId = \unpack('n', \substr($payload, 0, 2))[1]; + $reasonCode = \strlen($payload) > 2 ? \ord($payload[2]) : self::REASON_SUCCESS; + + return [ + 'packetId' => $packetId, + 'reasonCode' => $reasonCode, + ]; + } + + private static function buildPacket(int $type, int $flags, string $body): string + { + $header = \chr((($type & 0x0F) << 4) | ($flags & 0x0F)); + + return $header . self::encodeVariableByteInteger(\strlen($body)) . $body; + } + + private static function encodeString(string $value): string + { + $length = \strlen($value); + if ($length > 0xFFFF) { + throw new \InvalidArgumentException("MQTT string exceeds 65535 byte limit ({$length} given)"); + } + + return \pack('n', $length) . $value; + } + + private static function encodeBinary(string $value): string + { + $length = \strlen($value); + if ($length > 0xFFFF) { + throw new \InvalidArgumentException("MQTT binary exceeds 65535 byte limit ({$length} given)"); + } + + return \pack('n', $length) . $value; + } + + private static function encodeVariableByteInteger(int $value): string + { + if ($value < 0 || $value > 268435455) { + throw new \InvalidArgumentException('Variable byte integer out of range.'); + } + + $bytes = ''; + do { + $byte = $value & 0x7F; + $value >>= 7; + if ($value > 0) { + $byte |= 0x80; + } + $bytes .= \chr($byte); + } while ($value > 0); + + return $bytes; + } + + private static function readVariableByteInteger(string $buffer, int &$offset): ?int + { + $multiplier = 1; + $value = 0; + $length = \strlen($buffer); + $start = $offset; + + do { + if ($offset >= $length) { + $offset = $start; + return null; + } + $byte = \ord($buffer[$offset++]); + $value += ($byte & 0x7F) * $multiplier; + if ($multiplier > 128 * 128 * 128) { + throw new \RuntimeException('Malformed variable byte integer'); + } + $multiplier *= 128; + } while (($byte & 0x80) !== 0); + + return $value; + } + + private static function readString(string $buffer, int &$offset): string + { + $len = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + $value = \substr($buffer, $offset, $len); + $offset += $len; + + return $value; + } + + private static function readBinary(string $buffer, int &$offset): string + { + return self::readString($buffer, $offset); + } + + /** + * @return array + */ + private static function readProperties(string $buffer): array + { + $offset = 0; + $length = \strlen($buffer); + $properties = []; + + while ($offset < $length) { + $identifier = \ord($buffer[$offset++]); + + switch ($identifier) { + case self::PROPERTY_MESSAGE_EXPIRY: + $properties['messageExpiryInterval'] = \unpack('N', \substr($buffer, $offset, 4))[1]; + $offset += 4; + break; + case self::PROPERTY_CONTENT_TYPE: + $properties['contentType'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_RESPONSE_TOPIC: + $properties['responseTopic'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_CORRELATION_DATA: + $properties['correlationData'] = self::readBinary($buffer, $offset); + break; + case self::PROPERTY_SESSION_EXPIRY: + $properties['sessionExpiryInterval'] = \unpack('N', \substr($buffer, $offset, 4))[1]; + $offset += 4; + break; + case self::PROPERTY_RECEIVE_MAXIMUM: + $properties['receiveMaximum'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_AUTHENTICATION_METHOD: + $properties['authenticationMethod'] = self::readString($buffer, $offset); + break; + case self::PROPERTY_AUTHENTICATION_DATA: + $properties['authenticationData'] = self::readBinary($buffer, $offset); + break; + case self::PROPERTY_USER_PROPERTY: + $key = self::readString($buffer, $offset); + $value = self::readString($buffer, $offset); + $properties['userProperties'][$key] = $value; + break; + case self::PROPERTY_TOPIC_ALIAS_MAXIMUM: + $properties['topicAliasMaximum'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_SERVER_KEEP_ALIVE: + $properties['serverKeepAlive'] = \unpack('n', \substr($buffer, $offset, 2))[1]; + $offset += 2; + break; + case self::PROPERTY_REASON_STRING: + $properties['reasonString'] = self::readString($buffer, $offset); + break; + default: + return $properties; + } + } + + return $properties; + } +} diff --git a/tests/Messaging/Adapter/Push/AppwriteTest.php b/tests/Messaging/Adapter/Push/AppwriteTest.php new file mode 100644 index 00000000..12fdb932 --- /dev/null +++ b/tests/Messaging/Adapter/Push/AppwriteTest.php @@ -0,0 +1,185 @@ +startBroker(['device-token-1', 'device-token-2']); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: ['device-token-1', 'device-token-2'], + title: 'Hi', + body: 'Hello', + data: ['k' => 'v'], + ); + + $response = $adapter->send($message); + + $this->assertSame(2, $response['deliveredTo']); + $this->assertSame('push', $response['type']); + $this->assertCount(2, $response['results']); + + foreach ($response['results'] as $result) { + $this->assertSame('success', $result['status']); + $this->assertSame('', $result['error']); + } + + $captured = $this->stopBroker($broker); + + $this->assertCount(2, $captured['publishes']); + $this->assertSame('appwrite/push/device-token-1', $captured['publishes'][0]['topic']); + $this->assertSame('appwrite/push/device-token-2', $captured['publishes'][1]['topic']); + + $decoded = \json_decode($captured['publishes'][0]['payload'], true); + $this->assertSame('Hi', $decoded['notification']['title']); + $this->assertSame('Hello', $decoded['notification']['body']); + $this->assertSame(['k' => 'v'], $decoded['data']); + + $this->assertSame('server', $captured['connect']['username']); + $this->assertNotEmpty($captured['connect']['password']); + $this->assertStringStartsWith('appwrite-server-', $captured['connect']['clientId']); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + + public function testReportsExpiredTokenOnBrokerReasonCode(): void + { + $broker = $this->startBroker(['live-token'], rejectTokens: ['stale-token']); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: ['live-token', 'stale-token'], + title: 'Hi', + body: 'Hello', + ); + + $response = $adapter->send($message); + + $this->assertSame(1, $response['deliveredTo']); + $this->assertSame('success', $response['results'][0]['status']); + $this->assertSame('live-token', $response['results'][0]['recipient']); + $this->assertSame('failure', $response['results'][1]['status']); + $this->assertSame('stale-token', $response['results'][1]['recipient']); + $this->assertSame('Expired device token', $response['results'][1]['error']); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + + /** + * @param array $expectTokens + * @param array $rejectTokens + * @return array{port: int, process: resource, captured: string} + */ + private function startBroker(array $expectTokens, array $rejectTokens = []): array + { + $port = $this->pickFreePort(); + $capturePath = \sys_get_temp_dir() . '/appwrite-push-broker-' . \uniqid() . '.json'; + $stateFile = \sys_get_temp_dir() . '/appwrite-push-broker-state-' . \uniqid() . '.json'; + + \file_put_contents($stateFile, \json_encode([ + 'expect' => $expectTokens, + 'reject' => $rejectTokens, + ])); + + $brokerScript = __DIR__ . '/FakeBroker.php'; + + $process = \proc_open( + [PHP_BINARY, $brokerScript, (string)$port, $capturePath, $stateFile], + [ + 0 => ['pipe', 'r'], + 1 => ['file', '/dev/null', 'a'], + 2 => ['file', '/dev/null', 'a'], + ], + $pipes, + ); + + if (!\is_resource($process)) { + $this->fail('Could not start fake broker process'); + } + + $deadline = \microtime(true) + 3; + while (\microtime(true) < $deadline) { + $probe = @\fsockopen('127.0.0.1', $port, $errno, $errstr, 0.1); + if (\is_resource($probe)) { + \fclose($probe); + return [ + 'port' => $port, + 'process' => $process, + 'captured' => $capturePath, + ]; + } + \usleep(50000); + } + + \proc_terminate($process); + \proc_close($process); + $this->fail("Broker on port {$port} did not come up in time"); + } + + /** + * @param array{port: int, process: resource, captured: string} $broker + * @return array{publishes: array, connect: array} + */ + private function stopBroker(array $broker, bool $suppress = false): array + { + if (\is_resource($broker['process'])) { + \proc_terminate($broker['process'], SIGTERM); + $deadline = \microtime(true) + 1; + while (\microtime(true) < $deadline) { + $status = \proc_get_status($broker['process']); + if (!$status['running']) { + break; + } + \usleep(25000); + } + \proc_close($broker['process']); + } + + if (!\file_exists($broker['captured'])) { + if ($suppress) { + return ['publishes' => [], 'connect' => []]; + } + $this->fail("Broker capture file missing: {$broker['captured']}"); + } + + $captured = \json_decode(\file_get_contents($broker['captured']), true); + @\unlink($broker['captured']); + + return $captured ?: ['publishes' => [], 'connect' => []]; + } + + private function pickFreePort(): int + { + $sock = \stream_socket_server('tcp://127.0.0.1:0', $errno, $errstr); + if (!$sock) { + $this->fail("Could not bind ephemeral port: {$errstr}"); + } + $name = \stream_socket_get_name($sock, false); + \fclose($sock); + + return (int)\explode(':', $name)[1]; + } +} diff --git a/tests/Messaging/Adapter/Push/FakeBroker.php b/tests/Messaging/Adapter/Push/FakeBroker.php new file mode 100644 index 00000000..b25b53e3 --- /dev/null +++ b/tests/Messaging/Adapter/Push/FakeBroker.php @@ -0,0 +1,109 @@ + [], + 'publishes' => [], +]; + +\register_shutdown_function(function () use (&$captured, $capturePath) { + \file_put_contents($capturePath, \json_encode($captured)); +}); + +\pcntl_async_signals(true); +foreach ([SIGTERM, SIGINT] as $signal) { + \pcntl_signal($signal, function () use (&$captured, $capturePath) { + \file_put_contents($capturePath, \json_encode($captured)); + exit(0); + }); +} + +$server = \stream_socket_server("tcp://127.0.0.1:{$port}", $errno, $errstr); +if (!$server) { + \fwrite(STDERR, "Could not bind: {$errstr}\n"); + exit(1); +} + +\stream_set_blocking($server, false); + +// @phpstan-ignore-next-line +while (true) { // Exits only via SIGTERM handler above. + $client = @\stream_socket_accept($server, 5); + if (!$client) { + continue; + } + + \stream_set_timeout($client, 5); + + $buffer = ''; + + while (!\feof($client)) { + $chunk = @\fread($client, 4096); + if ($chunk === '' || $chunk === false) { + $info = \stream_get_meta_data($client); + if (!empty($info['timed_out'])) { + break; + } + continue; + } + + $buffer .= $chunk; + + while (($packet = MQTT::decodePacket($buffer)) !== null) { + switch ($packet['type']) { + case MQTT::PACKET_CONNECT: + $parsed = MQTT::parseConnect($packet['payload']); + $captured['connect'] = [ + 'clientId' => $parsed['clientId'], + 'username' => (string)$parsed['username'], + 'password' => (string)$parsed['password'], + ]; + \fwrite($client, MQTT::encodeConnack(MQTT::REASON_SUCCESS)); + break; + + case MQTT::PACKET_PUBLISH: + $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); + $captured['publishes'][] = [ + 'topic' => $parsed['topic'], + 'payload' => $parsed['payload'], + 'qos' => $parsed['qos'], + ]; + + $reason = MQTT::REASON_SUCCESS; + foreach ($rejectTokens as $bad) { + if (\str_ends_with($parsed['topic'], '/' . $bad)) { + $reason = 0x10; + break; + } + } + + if ($parsed['qos'] === 1 && $parsed['packetId'] !== null) { + \fwrite($client, MQTT::encodePuback($parsed['packetId'], $reason)); + } + break; + + case MQTT::PACKET_DISCONNECT: + @\fclose($client); + break 3; + + case MQTT::PACKET_PINGREQ: + \fwrite($client, MQTT::encodePingresp()); + break; + + default: + break; + } + } + } + + @\fclose($client); + \file_put_contents($capturePath, \json_encode($captured)); +} diff --git a/tests/Messaging/Helpers/MQTTTest.php b/tests/Messaging/Helpers/MQTTTest.php new file mode 100644 index 00000000..6f72fc41 --- /dev/null +++ b/tests/Messaging/Helpers/MQTTTest.php @@ -0,0 +1,171 @@ + 3600], + ); + + $this->assertNotEmpty($packet); + $this->assertSame(MQTT::PACKET_CONNECT, (\ord($packet[0]) >> 4) & 0x0F); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_CONNECT, $decoded['type']); + + $parsed = MQTT::parseConnect($decoded['payload']); + $this->assertSame('MQTT', $parsed['protocol']); + $this->assertSame(5, $parsed['version']); + $this->assertSame('device-abc', $parsed['clientId']); + $this->assertSame('server', $parsed['username']); + $this->assertSame('jwt.value.here', $parsed['password']); + $this->assertSame(30, $parsed['keepAlive']); + $this->assertTrue($parsed['cleanStart']); + $this->assertSame(3600, $parsed['properties']['sessionExpiryInterval']); + } + + public function testEncodesAndParsesPublish(): void + { + $payload = '{"notification":{"title":"Hi"}}'; + $packet = MQTT::encodePublish( + topic: 'appwrite/push/device-token-1', + payload: $payload, + qos: 1, + retain: false, + dup: false, + packetId: 17, + properties: [ + 'messageExpiryInterval' => 86400, + 'contentType' => 'application/json', + ], + ); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_PUBLISH, $decoded['type']); + + $parsed = MQTT::parsePublish($decoded['payload'], $decoded['flags']); + $this->assertSame('appwrite/push/device-token-1', $parsed['topic']); + $this->assertSame($payload, $parsed['payload']); + $this->assertSame(1, $parsed['qos']); + $this->assertSame(17, $parsed['packetId']); + $this->assertSame(86400, $parsed['properties']['messageExpiryInterval']); + $this->assertSame('application/json', $parsed['properties']['contentType']); + } + + public function testEncodesAndParsesConnack(): void + { + $packet = MQTT::encodeConnack(MQTT::REASON_SUCCESS, sessionPresent: false, properties: ['serverKeepAlive' => 60]); + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_CONNACK, $decoded['type']); + + $parsed = MQTT::parseConnack($decoded['payload']); + $this->assertSame(MQTT::REASON_SUCCESS, $parsed['reasonCode']); + $this->assertFalse($parsed['sessionPresent']); + $this->assertSame(60, $parsed['properties']['serverKeepAlive']); + } + + public function testEncodesAndParsesPuback(): void + { + $packet = MQTT::encodePuback(42, MQTT::REASON_SUCCESS); + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $this->assertSame(MQTT::PACKET_PUBACK, $decoded['type']); + + $parsed = MQTT::parsePuback($decoded['payload']); + $this->assertSame(42, $parsed['packetId']); + $this->assertSame(MQTT::REASON_SUCCESS, $parsed['reasonCode']); + } + + public function testEncodesPingreqAndPingresp(): void + { + $req = MQTT::encodePingreq(); + $resp = MQTT::encodePingresp(); + + $decodedReq = MQTT::decodePacket($req); + $decodedResp = MQTT::decodePacket($resp); + + $this->assertSame(MQTT::PACKET_PINGREQ, $decodedReq['type']); + $this->assertSame(MQTT::PACKET_PINGRESP, $decodedResp['type']); + } + + public function testDecodeReturnsNullForPartialBuffer(): void + { + $packet = MQTT::encodePublish('topic', 'body', 0, false, false, null); + $partial = \substr($packet, 0, 1); + + $buffer = $partial; + $this->assertNull(MQTT::decodePacket($buffer)); + $this->assertSame($partial, $buffer); + } + + public function testDecodeConsumesExactlyOnePacketFromConcatenated(): void + { + $first = MQTT::encodePublish('a/b', '1', 0, false, false, null); + $second = MQTT::encodePublish('c/d', '2', 0, false, false, null); + + $buffer = $first . $second; + + $packet = MQTT::decodePacket($buffer); + $this->assertNotNull($packet); + $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); + $this->assertSame('a/b', $parsed['topic']); + $this->assertSame('1', $parsed['payload']); + + $next = MQTT::decodePacket($buffer); + $this->assertNotNull($next); + $parsedNext = MQTT::parsePublish($next['payload'], $next['flags']); + $this->assertSame('c/d', $parsedNext['topic']); + $this->assertSame('2', $parsedNext['payload']); + + $this->assertSame('', $buffer); + } + + public function testEncodesLargePayloadAcrossMultiByteRemainingLength(): void + { + $bigPayload = \str_repeat('x', 200); + $packet = MQTT::encodePublish('topic/large', $bigPayload, 0, false, false, null); + + $decoded = MQTT::decodePacket($packet); + $this->assertNotNull($decoded); + $parsed = MQTT::parsePublish($decoded['payload'], $decoded['flags']); + $this->assertSame($bigPayload, $parsed['payload']); + } + + public function testSubscribeParsing(): void + { + $topic = 'appwrite/push/device-abc'; + $body = \pack('n', 5) + . \chr(0) + . \pack('n', \strlen($topic)) . $topic + . \chr(0x01); + + $parsed = MQTT::parseSubscribe($body); + + $this->assertSame(5, $parsed['packetId']); + $this->assertCount(1, $parsed['filters']); + $this->assertSame('appwrite/push/device-abc', $parsed['filters'][0]['topic']); + $this->assertSame(1, $parsed['filters'][0]['qos']); + } + + public function testEncodeConnectRejectsLongStrings(): void + { + $tooLong = \str_repeat('a', 65536); + + $this->expectException(\Throwable::class); + MQTT::encodeConnect(clientId: $tooLong); + } +} From ba4ba2e77b80eaf270b5ccb76d70aded7c468ea6 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Sat, 23 May 2026 15:40:28 +1200 Subject: [PATCH 2/3] fix(Push): address PR review on Appwrite Push adapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Pipeline PUBLISHes up to the broker-advertised Receive Maximum (default 256) and match PUBACKs by packet id. Drops effective send time from N×RTT to ~RTT for large fan-outs. - Verify PUBACK packet id matches the in-flight publish so an out-of-order or duplicate ack cannot attribute success to the wrong device token (Greptile P2, Copilot). - Surface json_encode failures as a RuntimeException instead of silently sending an empty payload to the broker (Greptile P1). - Persistent read buffer on the adapter so coalesced TCP reads do not drop trailing MQTT packets between readPacket() calls (Copilot). - MQTT::encodeConnect throws when a password is supplied without a username (MQTT 5 §3.1.2.9, Greptile P2). - MQTT::encodePublish validates QoS is 0/1/2 instead of silently masking the bits (Copilot). - FakeBroker fixture rewritten on Swoole — drops the pcntl dependency that wasn't installed in the alpine test image, and exercises the same async runtime Appwrite uses in production. Dockerfile installs ext- swoole via PECL for the tests image. - New tests: pipelined send to 64 devices, password-without-username rejection, invalid-QoS rejection. Co-Authored-By: Claude Opus 4.7 (1M context) --- Dockerfile | 9 ++ .../Messaging/Adapter/Push/Appwrite.php | 133 +++++++++++---- src/Utopia/Messaging/Helpers/MQTT.php | 9 +- tests/Messaging/Adapter/Push/AppwriteTest.php | 40 +++++ tests/Messaging/Adapter/Push/FakeBroker.php | 153 +++++++++--------- tests/Messaging/Helpers/MQTTTest.php | 27 ++++ 6 files changed, 256 insertions(+), 115 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5bc1e5e3..39fb4b8c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,6 +19,15 @@ FROM php:8.3.11-cli-alpine3.20 WORKDIR /usr/local/src/ +RUN apk add --no-cache --virtual .build-deps \ + $PHPIZE_DEPS \ + linux-headers \ + openssl-dev \ + && pecl install swoole \ + && docker-php-ext-enable swoole \ + && apk del .build-deps \ + && apk add --no-cache libstdc++ + COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor COPY . /usr/local/src/ diff --git a/src/Utopia/Messaging/Adapter/Push/Appwrite.php b/src/Utopia/Messaging/Adapter/Push/Appwrite.php index a3f1b543..48ca64d2 100644 --- a/src/Utopia/Messaging/Adapter/Push/Appwrite.php +++ b/src/Utopia/Messaging/Adapter/Push/Appwrite.php @@ -34,6 +34,20 @@ class Appwrite extends PushAdapter private int $packetId = 0; + /** + * Persistent read buffer carrying over bytes the decoder didn't yet consume. + * MQTT packets can be coalesced into a single TCP read and we'd otherwise + * lose them between calls to readPacket(). + */ + private string $readBuffer = ''; + + /** + * Max number of unacknowledged PUBLISHes in flight at any time. MQTT 5's + * Receive Maximum default is 65535 but most real brokers advertise a smaller + * value in CONNACK; we honor whichever is smaller after handshake. + */ + private int $receiveMaximum = 256; + public function __construct( private string $endpoint, private string $signingKey, @@ -70,14 +84,47 @@ protected function process(PushMessage $message): array try { $this->handshake($socket); + $this->pipelinedPublish($socket, $message->getTo(), $payload, $expiry, $response); + + try { + $this->write($socket, MQTT::encodeDisconnect()); + } catch (\Throwable) { + // Best effort; some brokers may have already closed the socket. + } + } finally { + $this->close($socket); + } - foreach ($message->getTo() as $token) { - $topic = $this->topicForToken($token); + return $response->toArray(); + } + + /** + * Pipelined PUBLISH/PUBACK loop. + * + * Sends up to `receiveMaximum` PUBLISH packets without waiting for an + * acknowledgment, then drains PUBACKs as they arrive, matching each by + * packet id. Refills the in-flight window after each ack until every + * device has been sent. This keeps throughput proportional to socket + * bandwidth rather than to network RTT — important when fanning out to + * thousands of devices per request. + * + * @param resource $socket + * @param array $tokens + */ + private function pipelinedPublish($socket, array $tokens, string $payload, int $expiry, Response $response): void + { + $inflight = []; + $cursor = 0; + $total = \count($tokens); + + while ($cursor < $total || !empty($inflight)) { + while ($cursor < $total && \count($inflight) < $this->receiveMaximum) { + $token = $tokens[$cursor++]; + $packetId = $this->nextPacketId(); try { - $packetId = $this->nextPacketId(); $packet = MQTT::encodePublish( - topic: $topic, + topic: $this->topicForToken($token), payload: $payload, qos: 1, retain: false, @@ -88,38 +135,45 @@ protected function process(PushMessage $message): array 'contentType' => 'application/json', ], ); - $this->write($socket, $packet); - $ack = $this->readPacket($socket); - if ($ack['type'] !== MQTT::PACKET_PUBACK) { - $response->addResult($token, 'Broker did not acknowledge PUBLISH'); - continue; - } - - $parsed = MQTT::parsePuback($ack['payload']); - if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) { - $error = $this->errorForReasonCode($parsed['reasonCode']); - $response->addResult($token, $error); - continue; - } - - $response->incrementDeliveredTo(); - $response->addResult($token); + $inflight[$packetId] = $token; } catch (\Throwable $error) { $response->addResult($token, $error->getMessage()); } } + if (empty($inflight)) { + continue; + } + try { - $this->write($socket, MQTT::encodeDisconnect()); - } catch (\Throwable) { - // Best effort; some brokers may have already closed the socket. + $ack = $this->readPacket($socket); + } catch (\Throwable $error) { + foreach ($inflight as $token) { + $response->addResult($token, $error->getMessage()); + } + return; } - } finally { - $this->close($socket); - } - return $response->toArray(); + if ($ack['type'] !== MQTT::PACKET_PUBACK) { + continue; + } + + $parsed = MQTT::parsePuback($ack['payload']); + $token = $inflight[$parsed['packetId']] ?? null; + if ($token === null) { + continue; + } + unset($inflight[$parsed['packetId']]); + + if ($parsed['reasonCode'] !== MQTT::REASON_SUCCESS) { + $response->addResult($token, $this->errorForReasonCode($parsed['reasonCode'])); + continue; + } + + $response->incrementDeliveredTo(); + $response->addResult($token); + } } /** @@ -175,7 +229,12 @@ private function buildPayload(PushMessage $message): string }; } - return \json_encode($envelope, JSON_UNESCAPED_SLASHES); + $json = \json_encode($envelope, JSON_UNESCAPED_SLASHES); + if ($json === false) { + throw new \RuntimeException('Failed to encode push payload: ' . \json_last_error_msg()); + } + + return $json; } private function resolveExpiry(PushMessage $message): int @@ -291,6 +350,11 @@ private function handshake($socket): void if ($connack['reasonCode'] !== MQTT::REASON_SUCCESS) { throw new \RuntimeException("Broker rejected CONNECT (reason {$connack['reasonCode']})"); } + + $brokerLimit = (int)($connack['properties']['receiveMaximum'] ?? 0); + if ($brokerLimit > 0) { + $this->receiveMaximum = \min($this->receiveMaximum, $brokerLimit); + } } private function issueServerJwt(): string @@ -313,8 +377,12 @@ private function issueServerJwt(): string */ private function readPacket($socket): array { - $buffer = ''; while (true) { + $packet = MQTT::decodePacket($this->readBuffer); + if ($packet !== null) { + return $packet; + } + $chunk = @\fread($socket, 4096); if ($chunk === false || $chunk === '') { if (\feof($socket)) { @@ -329,12 +397,7 @@ private function readPacket($socket): array continue; } - $buffer .= $chunk; - - $packet = MQTT::decodePacket($buffer); - if ($packet !== null) { - return $packet; - } + $this->readBuffer .= $chunk; } } diff --git a/src/Utopia/Messaging/Helpers/MQTT.php b/src/Utopia/Messaging/Helpers/MQTT.php index 4442d162..c066166f 100644 --- a/src/Utopia/Messaging/Helpers/MQTT.php +++ b/src/Utopia/Messaging/Helpers/MQTT.php @@ -75,6 +75,9 @@ public static function encodeConnect( $flags |= 0x02; } if ($password !== null) { + if ($username === null) { + throw new \InvalidArgumentException('MQTT 5 §3.1.2.9 forbids setting a password without a username.'); + } $flags |= 0x40; } if ($username !== null) { @@ -163,11 +166,15 @@ public static function encodePublish( ?int $packetId = null, array $properties = [] ): string { + if ($qos < 0 || $qos > 2) { + throw new \InvalidArgumentException("MQTT QoS must be 0, 1, or 2 ({$qos} given)"); + } + $flags = 0; if ($dup) { $flags |= 0x08; } - $flags |= ($qos & 0x03) << 1; + $flags |= $qos << 1; if ($retain) { $flags |= 0x01; } diff --git a/tests/Messaging/Adapter/Push/AppwriteTest.php b/tests/Messaging/Adapter/Push/AppwriteTest.php index 12fdb932..4d927667 100644 --- a/tests/Messaging/Adapter/Push/AppwriteTest.php +++ b/tests/Messaging/Adapter/Push/AppwriteTest.php @@ -58,6 +58,46 @@ public function testSendPublishesToDeviceTopicAndCountsAck(): void } } + public function testPipelinesPublishesToManyDevices(): void + { + $tokens = []; + for ($i = 0; $i < 64; $i++) { + $tokens[] = "device-{$i}"; + } + + $broker = $this->startBroker($tokens); + + try { + $adapter = new Appwrite( + endpoint: '127.0.0.1:' . $broker['port'], + signingKey: self::SIGNING_KEY, + tls: false, + ); + + $message = new Push( + to: $tokens, + title: 'Burst', + body: 'Pipeline test', + ); + + $response = $adapter->send($message); + + $this->assertSame(\count($tokens), $response['deliveredTo']); + $this->assertCount(\count($tokens), $response['results']); + + $captured = $this->stopBroker($broker); + $this->assertCount(\count($tokens), $captured['publishes']); + + $seenTopics = \array_map(fn ($p) => $p['topic'], $captured['publishes']); + \sort($seenTopics); + $expectedTopics = \array_map(fn ($t) => 'appwrite/push/' . $t, $tokens); + \sort($expectedTopics); + $this->assertSame($expectedTopics, $seenTopics); + } finally { + $this->stopBroker($broker, suppress: true); + } + } + public function testReportsExpiredTokenOnBrokerReasonCode(): void { $broker = $this->startBroker(['live-token'], rejectTokens: ['stale-token']); diff --git a/tests/Messaging/Adapter/Push/FakeBroker.php b/tests/Messaging/Adapter/Push/FakeBroker.php index b25b53e3..bdc993ff 100644 --- a/tests/Messaging/Adapter/Push/FakeBroker.php +++ b/tests/Messaging/Adapter/Push/FakeBroker.php @@ -2,8 +2,11 @@ require __DIR__ . '/../../../../vendor/autoload.php'; +use Swoole\Server; +use Swoole\Timer; use Utopia\Messaging\Helpers\MQTT; +$argv = $_SERVER['argv']; [$_, $port, $capturePath, $stateFile] = $argv; $port = (int)$port; $state = \json_decode(\file_get_contents($stateFile), true) ?: []; @@ -14,96 +17,88 @@ 'publishes' => [], ]; -\register_shutdown_function(function () use (&$captured, $capturePath) { +$flush = function () use (&$captured, $capturePath) { \file_put_contents($capturePath, \json_encode($captured)); -}); - -\pcntl_async_signals(true); -foreach ([SIGTERM, SIGINT] as $signal) { - \pcntl_signal($signal, function () use (&$captured, $capturePath) { - \file_put_contents($capturePath, \json_encode($captured)); - exit(0); +}; +$flush(); + +/** @var array $buffers */ +$buffers = []; + +$server = new Server('127.0.0.1', $port, SWOOLE_BASE, SWOOLE_SOCK_TCP); +$server->set([ + 'worker_num' => 1, + 'max_request' => 0, + 'log_level' => SWOOLE_LOG_ERROR, + 'open_eof_check' => false, + 'open_tcp_nodelay' => true, +]); + +$server->on('start', function () { + Timer::after(15000, function () { + \Swoole\Event::exit(); }); -} - -$server = \stream_socket_server("tcp://127.0.0.1:{$port}", $errno, $errstr); -if (!$server) { - \fwrite(STDERR, "Could not bind: {$errstr}\n"); - exit(1); -} - -\stream_set_blocking($server, false); - -// @phpstan-ignore-next-line -while (true) { // Exits only via SIGTERM handler above. - $client = @\stream_socket_accept($server, 5); - if (!$client) { - continue; - } - - \stream_set_timeout($client, 5); +}); - $buffer = ''; +$server->on('close', function (Server $server, int $fd) use (&$buffers, $flush) { + unset($buffers[$fd]); + $flush(); +}); - while (!\feof($client)) { - $chunk = @\fread($client, 4096); - if ($chunk === '' || $chunk === false) { - $info = \stream_get_meta_data($client); - if (!empty($info['timed_out'])) { +$server->on('receive', function (Server $server, int $fd, int $reactorId, string $data) use (&$captured, &$buffers, $rejectTokens, $flush) { + $buffers[$fd] = ($buffers[$fd] ?? '') . $data; + + while (($packet = MQTT::decodePacket($buffers[$fd])) !== null) { + switch ($packet['type']) { + case MQTT::PACKET_CONNECT: + $parsed = MQTT::parseConnect($packet['payload']); + $captured['connect'] = [ + 'clientId' => $parsed['clientId'], + 'username' => (string)$parsed['username'], + 'password' => (string)$parsed['password'], + ]; + $server->send($fd, MQTT::encodeConnack(MQTT::REASON_SUCCESS)); + $flush(); break; - } - continue; - } - $buffer .= $chunk; - - while (($packet = MQTT::decodePacket($buffer)) !== null) { - switch ($packet['type']) { - case MQTT::PACKET_CONNECT: - $parsed = MQTT::parseConnect($packet['payload']); - $captured['connect'] = [ - 'clientId' => $parsed['clientId'], - 'username' => (string)$parsed['username'], - 'password' => (string)$parsed['password'], - ]; - \fwrite($client, MQTT::encodeConnack(MQTT::REASON_SUCCESS)); - break; - - case MQTT::PACKET_PUBLISH: - $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); - $captured['publishes'][] = [ - 'topic' => $parsed['topic'], - 'payload' => $parsed['payload'], - 'qos' => $parsed['qos'], - ]; - - $reason = MQTT::REASON_SUCCESS; - foreach ($rejectTokens as $bad) { - if (\str_ends_with($parsed['topic'], '/' . $bad)) { - $reason = 0x10; - break; - } + case MQTT::PACKET_PUBLISH: + $parsed = MQTT::parsePublish($packet['payload'], $packet['flags']); + $captured['publishes'][] = [ + 'topic' => $parsed['topic'], + 'payload' => $parsed['payload'], + 'qos' => $parsed['qos'], + ]; + + $reason = MQTT::REASON_SUCCESS; + foreach ($rejectTokens as $bad) { + if (\str_ends_with($parsed['topic'], '/' . $bad)) { + $reason = 0x10; + break; } + } - if ($parsed['qos'] === 1 && $parsed['packetId'] !== null) { - \fwrite($client, MQTT::encodePuback($parsed['packetId'], $reason)); - } - break; + if ($parsed['qos'] === 1 && $parsed['packetId'] !== null) { + $server->send($fd, MQTT::encodePuback($parsed['packetId'], $reason)); + } + $flush(); + break; - case MQTT::PACKET_DISCONNECT: - @\fclose($client); - break 3; + case MQTT::PACKET_DISCONNECT: + $server->close($fd); + $flush(); + Timer::after(50, fn () => \Swoole\Event::exit()); + return; - case MQTT::PACKET_PINGREQ: - \fwrite($client, MQTT::encodePingresp()); - break; + case MQTT::PACKET_PINGREQ: + $server->send($fd, MQTT::encodePingresp()); + break; - default: - break; - } + default: + break; } } +}); - @\fclose($client); - \file_put_contents($capturePath, \json_encode($captured)); -} +$server->start(); + +$flush(); diff --git a/tests/Messaging/Helpers/MQTTTest.php b/tests/Messaging/Helpers/MQTTTest.php index 6f72fc41..01a9cd89 100644 --- a/tests/Messaging/Helpers/MQTTTest.php +++ b/tests/Messaging/Helpers/MQTTTest.php @@ -168,4 +168,31 @@ public function testEncodeConnectRejectsLongStrings(): void $this->expectException(\Throwable::class); MQTT::encodeConnect(clientId: $tooLong); } + + public function testEncodeConnectRejectsPasswordWithoutUsername(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessageMatches('/password without a username/i'); + + MQTT::encodeConnect( + clientId: 'device-1', + username: null, + password: 'secret', + ); + } + + public function testEncodePublishRejectsInvalidQos(): void + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('MQTT QoS must be 0, 1, or 2'); + + MQTT::encodePublish( + topic: 'a/b', + payload: 'data', + qos: 3, + retain: false, + dup: false, + packetId: 1, + ); + } } From 5e341949c10ebe363698b7bb81ef871526cae1e7 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Sat, 23 May 2026 15:45:56 +1200 Subject: [PATCH 3/3] fix(Dockerfile): install libbrotli for swoole pecl build Swoole 6.x enables brotli compression by default and requires libbrotli-dev at build time + brotli-libs at runtime. Without them the configure step fails with "Package 'libbrotlienc' not found". Co-Authored-By: Claude Opus 4.7 (1M context) --- Dockerfile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 39fb4b8c..3a6814da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,10 +23,11 @@ RUN apk add --no-cache --virtual .build-deps \ $PHPIZE_DEPS \ linux-headers \ openssl-dev \ + brotli-dev \ && pecl install swoole \ && docker-php-ext-enable swoole \ && apk del .build-deps \ - && apk add --no-cache libstdc++ + && apk add --no-cache libstdc++ brotli-libs COPY --from=composer /usr/local/src/vendor /usr/local/src/vendor COPY . /usr/local/src/