From 380bd93cd2cde21a23d074e171e1c4cd6b22ea29 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 17 Dec 2021 10:57:48 +1100 Subject: [PATCH 01/11] ci: update mosquitto image --- config/mosquitto.conf | 1 + docker-compose.yml | 10 +++++----- scripts/generate-secrets | 4 ++-- spec/publishing/mqtt_broker_manager_spec.cr | 2 +- spec/spec_helper.cr | 1 + src/source/publishing/mqtt_publisher.cr | 5 ++++- test | 2 +- 7 files changed, 15 insertions(+), 10 deletions(-) diff --git a/config/mosquitto.conf b/config/mosquitto.conf index 5413956..f6fdb36 100644 --- a/config/mosquitto.conf +++ b/config/mosquitto.conf @@ -1,2 +1,3 @@ +log_type debug listener 1883 test allow_anonymous true diff --git a/docker-compose.yml b/docker-compose.yml index fe61cae..c146178 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ x-rethinkdb-client-env: &rethinkdb-client-env services: test: - image: placeos/service-spec-runner:${CRYSTAL_VERSION:-1.2.1} + image: placeos/service-spec-runner:${CRYSTAL_VERSION:-1.2.2} volumes: - ${PWD}/coverage:/app/coverage - ${PWD}/shard.lock:/app/shard.lock @@ -70,11 +70,11 @@ services: command: "--reporting-disabled" mqtt: - image: eclipse-mosquitto - volumes: - - ./config/mosquitto.conf:/mosquitto/config/mosquitto.conf - restart: always + image: iegomez/mosquitto-go-auth:${MOSQUITTO_IMAGE_TAG:-latest} hostname: mqtt + restart: always + volumes: + - ${PWD}/config/mosquitto.conf:/etc/mosquitto/mosquitto.conf environment: TZ: $TZ diff --git a/scripts/generate-secrets b/scripts/generate-secrets index f8edfe1..914388c 100755 --- a/scripts/generate-secrets +++ b/scripts/generate-secrets @@ -1,8 +1,8 @@ #! /usr/bin/env bash -image="placeos/init:${PLACE_INIT_TAG:-nightly}" +image="placeos/init:nightly" -docker pull "${image}" +# docker pull "${image}" docker run --rm \ -e PLACE_EMAIL=${PLACE_EMAIL:-support@place.tech} \ diff --git a/spec/publishing/mqtt_broker_manager_spec.cr b/spec/publishing/mqtt_broker_manager_spec.cr index 2c66f66..33f31f6 100644 --- a/spec/publishing/mqtt_broker_manager_spec.cr +++ b/spec/publishing/mqtt_broker_manager_spec.cr @@ -2,7 +2,7 @@ require "../spec_helper" module PlaceOS::Source describe MqttBrokerManager do - pending "creates MQTT publishing clients" do + it "creates MQTT publishing clients" do model = test_broker id = "broker-acabsns" model.id = id diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 59590cf..84ea643 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -19,6 +19,7 @@ def test_broker name: "mosquitto", host: ENV["MQTT_HOST"]?.presence || "localhost", port: ENV["MQTT_PORT"]?.presence.try &.to_i? || 1883, + auth_type: :no_auth, ) end diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 5f78024..6d6ae7f 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -69,7 +69,10 @@ module PlaceOS::Source # Establish a MQTT connection client = ::MQTT::V3::Client.new(transport) - client.connect + + spawn do + client.connect + end client end diff --git a/test b/test index e658db6..b21b742 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ trap "trap_ctrlc" 2 ./scripts/generate-secrets -docker-compose pull -q +# docker-compose pull -q docker-compose up --no-deps --detach influxdb From 434652eb5eb6a12fe5dffe1cef4de504a79a6952 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Thu, 13 Jan 2022 14:47:39 +1100 Subject: [PATCH 02/11] style: crystal 1.3.0 formatter --- spec/router/control_system_router_spec.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/router/control_system_router_spec.cr b/spec/router/control_system_router_spec.cr index 2d973aa..d847b92 100644 --- a/spec/router/control_system_router_spec.cr +++ b/spec/router/control_system_router_spec.cr @@ -29,7 +29,7 @@ module PlaceOS::Source modules = mock_modules(["custom", nil, "custom", "extra_custom", nil]) cs.modules = modules.compact_map &.id - Router::ControlSystem.system_modules(cs, modules).should eq ({ + Router::ControlSystem.system_modules(cs, modules).should eq({ "mod-0" => { name: "custom", control_system_id: "cs-1245", @@ -63,7 +63,7 @@ module PlaceOS::Source cs.id = "cs-1245" zones = mock_zones cs.zones = zones.compact_map &.id - Router::ControlSystem.system_zones(cs, zones).should eq ({ + Router::ControlSystem.system_zones(cs, zones).should eq({ "org" => "zone-0", "building" => "zone-1", "level" => "zone-2", From f971813da57594c16beb42c8d3ab849c84b387ac Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Tue, 18 Jan 2022 09:58:37 +1100 Subject: [PATCH 03/11] test: config for mqtt --- Dockerfile | 2 +- config/mosquitto.conf | 17 ++++++++++++++--- docker-compose.yml | 2 +- spec/publishing/mqtt_broker_manager_spec.cr | 4 +++- 4 files changed, 19 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9f308da..b96da36 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG CRYSTAL_VERSION=1.1.1 +ARG CRYSTAL_VERSION=1.3.1 FROM crystallang/crystal:${CRYSTAL_VERSION}-alpine as build WORKDIR /app diff --git a/config/mosquitto.conf b/config/mosquitto.conf index f6fdb36..8f6cfc6 100644 --- a/config/mosquitto.conf +++ b/config/mosquitto.conf @@ -1,3 +1,14 @@ -log_type debug -listener 1883 test -allow_anonymous true +listener 1883 +listener 9001 +protocol websockets + +include_dir /mosquitto +auth_plugin /mosquitto/go-auth.so +auth_opt_backends jwt +auth_opt_jwt_mode remote +auth_opt_jwt_host api +auth_opt_jwt_port 3000 +auth_opt_jwt_getuser_uri /api/engine/v2/mqtt/mqtt_user +auth_opt_jwt_aclcheck_uri /api/engine/v2/mqtt/mqtt_access +auth_opt_jwt_response_mode status +auth_opt_jwt_params_mode form diff --git a/docker-compose.yml b/docker-compose.yml index c146178..10ceb1e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ x-rethinkdb-client-env: &rethinkdb-client-env services: test: - image: placeos/service-spec-runner:${CRYSTAL_VERSION:-1.2.2} + image: placeos/service-spec-runner:${CRYSTAL_VERSION:-1.3.1} volumes: - ${PWD}/coverage:/app/coverage - ${PWD}/shard.lock:/app/shard.lock diff --git a/spec/publishing/mqtt_broker_manager_spec.cr b/spec/publishing/mqtt_broker_manager_spec.cr index 33f31f6..4abfe6a 100644 --- a/spec/publishing/mqtt_broker_manager_spec.cr +++ b/spec/publishing/mqtt_broker_manager_spec.cr @@ -14,7 +14,9 @@ module PlaceOS::Source # Yield to the PublisherManager while publisher_manager.processed.empty? - Fiber.yield + # My guess if its forever looping here its because the creation of a mqtt is blocking + # this then blocks IO somewhere + sleep 100.milliseconds end publisher = publisher_manager.@publishers[id]? From bac06c1a60c71d8c8033f7e3c88441686db68cb1 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Wed, 19 Jan 2022 16:11:14 +1100 Subject: [PATCH 04/11] fix(mqtt): correct connection flow --- config/mosquitto.conf | 13 +------ shard.lock | 4 +- shard.yml | 4 ++ spec/publishing/mqtt_broker_manager_spec.cr | 14 ++++--- spec/publishing/mqtt_publisher_spec.cr | 37 ++++++++++++++++++ spec/spec_helper.cr | 2 +- spec/status_events_spec.cr | 40 +++++++++++--------- src/source/publishing/mqtt_broker_manager.cr | 8 +++- src/source/publishing/mqtt_publisher.cr | 39 +++++++++++++------ src/source/publishing/publisher.cr | 4 +- 10 files changed, 115 insertions(+), 50 deletions(-) diff --git a/config/mosquitto.conf b/config/mosquitto.conf index 8f6cfc6..025ee99 100644 --- a/config/mosquitto.conf +++ b/config/mosquitto.conf @@ -1,14 +1,5 @@ listener 1883 +allow_anonymous true listener 9001 protocol websockets - -include_dir /mosquitto -auth_plugin /mosquitto/go-auth.so -auth_opt_backends jwt -auth_opt_jwt_mode remote -auth_opt_jwt_host api -auth_opt_jwt_port 3000 -auth_opt_jwt_getuser_uri /api/engine/v2/mqtt/mqtt_user -auth_opt_jwt_aclcheck_uri /api/engine/v2/mqtt/mqtt_access -auth_opt_jwt_response_mode status -auth_opt_jwt_params_mode form +allow_anonymous true diff --git a/shard.lock b/shard.lock index f6c2ea9..9442975 100644 --- a/shard.lock +++ b/shard.lock @@ -30,7 +30,7 @@ shards: faker: git: https://github.com/askn/faker.git - version: 0.6.0 + version: 0.7.0 flux: git: https://github.com/place-labs/flux.git @@ -82,7 +82,7 @@ shards: placeos-models: git: https://github.com/placeos/models.git - version: 5.15.2 + version: 5.15.3 placeos-resource: git: https://github.com/place-labs/resource.git diff --git a/shard.yml b/shard.yml index 222049d..3ab81d3 100644 --- a/shard.yml +++ b/shard.yml @@ -39,6 +39,10 @@ dependencies: github: spider-gazelle/rethinkdb-orm version: ">= 4.0.0" + retriable: + github: Sija/retriable.cr + version: ~> 0.2 + rwlock: github: spider-gazelle/readers-writer diff --git a/spec/publishing/mqtt_broker_manager_spec.cr b/spec/publishing/mqtt_broker_manager_spec.cr index 4abfe6a..4e86f64 100644 --- a/spec/publishing/mqtt_broker_manager_spec.cr +++ b/spec/publishing/mqtt_broker_manager_spec.cr @@ -4,8 +4,6 @@ module PlaceOS::Source describe MqttBrokerManager do it "creates MQTT publishing clients" do model = test_broker - id = "broker-acabsns" - model.id = id event = Resource::Event.new(:created, model) publisher_manager = MqttBrokerManager.new @@ -14,13 +12,19 @@ module PlaceOS::Source # Yield to the PublisherManager while publisher_manager.processed.empty? - # My guess if its forever looping here its because the creation of a mqtt is blocking - # this then blocks IO somewhere sleep 100.milliseconds end - publisher = publisher_manager.@publishers[id]? + publisher = publisher_manager.@publishers[model.id.as(String)]? publisher.should_not be_nil end + + it "publishes events" do + # Create the publisher manager + # Add an MQTT publisher + # Add an influx publisher + # Ensure both publishers are present + # Check that the event is published + end end end diff --git a/spec/publishing/mqtt_publisher_spec.cr b/spec/publishing/mqtt_publisher_spec.cr index cff21c6..ef19c7d 100644 --- a/spec/publishing/mqtt_publisher_spec.cr +++ b/spec/publishing/mqtt_publisher_spec.cr @@ -2,6 +2,43 @@ require "../spec_helper" module PlaceOS::Source describe MqttPublisher do + describe "#publish" do + it "writes to an MQTT topic" do + publisher = MqttPublisher.new(test_broker) + + state = mock_state( + module_id: "mod-1234", + index: 1, + module_name: "M'Odule", + driver_id: "12345", + control_system_id: "cs-9445", + area_id: "2042", + level_id: "nek", + building_id: "cards", + org_id: "org-donor", + ) + + status_event = Mappings.new(state).status_events?("mod-1234", "power").not_nil!.first + key = MqttPublisher.generate_key(status_event).not_nil! + + results = Channel(JSON::Any).new + + spawn do + client = publisher.new_client + client.subscribe(key) do |_key, payload| + results.send(JSON.parse(String.new(payload))) + client.unsubscribe(key) + end + end + + sleep 10.milliseconds + + publisher.publish(Publisher::Message.new(status_event, "true")) + + results.receive["value"].should be_true + end + end + describe "keys" do it "creates a state event topic" do state = mock_state( diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index 84ea643..c131199 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -20,7 +20,7 @@ def test_broker host: ENV["MQTT_HOST"]?.presence || "localhost", port: ENV["MQTT_PORT"]?.presence.try &.to_i? || 1883, auth_type: :no_auth, - ) + ).save! end module PlaceOS::Source diff --git a/spec/status_events_spec.cr b/spec/status_events_spec.cr index 8617969..0393f91 100644 --- a/spec/status_events_spec.cr +++ b/spec/status_events_spec.cr @@ -3,31 +3,35 @@ require "./spec_helper" module PlaceOS::Source describe StatusEvents do it "passes status events to PublisherManager" do - # module_id = "mod-hello_hello" - # status_key = "power" - # mock_mappings_state = mock_state(module_id: module_id) + module_id = "mod-hello_hello" + status_key = "power" + mock_mappings_state = mock_state(module_id: module_id) - # mock_mappings = Mappings.new(mock_mappings_state) - # mock_publisher_manager = MockManager.new + mock_mappings = Mappings.new(mock_mappings_state) + mock_publisher_manager = MockManager.new + managers : Array(PlaceOS::Source::PublisherManager) = [mock_publisher_manager] of PlaceOS::Source::PublisherManager - # events = StatusEvents.new(mock_mappings, mock_publisher_manager) - # spawn(same_thread: true) { events.start } + events = StatusEvents.new(mock_mappings, managers) + spawn(same_thread: true) { events.start } - # sleep 0.1 + sleep 0.1 - # Redis.open(url: REDIS_URL) do |client| - # client.publish("status/#{module_id}/#{status_key}", "on".to_json) - # end + Redis.open(url: REDIS_URL) do |client| + client.publish("status/#{module_id}/#{status_key}", "on".to_json) + end - # sleep 0.1 + sleep 0.1 - # message = mock_publisher_manager.messages.first? - # message.should_not be_nil - # message = message.not_nil! - # message.key.should eq "placeos/org-donor/state/cards/nek/2042/cs-9445/12345/M'Odule/1/#{status_key}" - # message.payload.should eq expected_payload("on") + message = mock_publisher_manager.messages.first? + message.should_not be_nil + message = message.not_nil! - # events.stop + key = MqttPublisher.generate_key(message.data) + + key.should eq "placeos/org-donor/state/cards/nek/2042/cs-9445/12345/M'Odule/1/#{status_key}" + message.payload.should eq expected_payload("on") + + events.stop end end end diff --git a/src/source/publishing/mqtt_broker_manager.cr b/src/source/publishing/mqtt_broker_manager.cr index 7b847f8..b733736 100644 --- a/src/source/publishing/mqtt_broker_manager.cr +++ b/src/source/publishing/mqtt_broker_manager.cr @@ -45,7 +45,13 @@ module PlaceOS::Source # protected def create_publisher(broker : Model::Broker) : Resource::Result broker_id = broker.id.as(String) - publisher = MqttPublisher.new(broker) + begin + publisher = MqttPublisher.new(broker) + rescue e : IO::Error + Log.error(exception: e) { "failed to connect to broker on #{broker.host}:#{broker.port}" } + return Resource::Result::Error + end + write_publishers do |publishers| # Close off exisiting publisher, if present existing = publishers[broker_id]? diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 6d6ae7f..70df148 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -2,6 +2,7 @@ require "file" require "json" require "mqtt/v3/client" require "placeos-models/broker" +require "retriable" require "rwlock" require "./publisher" @@ -24,7 +25,7 @@ module PlaceOS::Source end def self.payload(value, broker : PlaceOS::Model::Broker?, timestamp : Time = self.timestamp) - value = broker.sanitize(value) unless broker.nil? + value = broker.sanitize(value) unless broker.nil? || value.nil? Event.new(value, timestamp).to_json end @@ -33,6 +34,10 @@ module PlaceOS::Source protected getter client : ::MQTT::V3::Client def initialize(@broker : PlaceOS::Model::Broker) + @client = new_client + end + + protected def new_client @client = MqttPublisher.client(@broker) end @@ -70,25 +75,34 @@ module PlaceOS::Source # Establish a MQTT connection client = ::MQTT::V3::Client.new(transport) - spawn do - client.connect - end + client.connect( + client_id: broker.id.as(String), + username: broker.username, + password: broker.password, + ) client end protected def publish(message : Message) - if (key = MqttPublisher.generate_key?(message.data)) + if (key = MqttPublisher.generate_key(message.data)) # Sanitize the message payload according the Broker's filters payload = broker_lock.read do MqttPublisher.payload(message.payload, broker) end - case message.data - # Update persistent 'metadata' topic (includes deleting) - in Mappings::Metadata then client.publish(topic: key, payload: payload, retain: true) - # Publish event to the 'status' topic - in Mappings::Status then client.publish(topic: key, payload: payload) + retain = case message.data + # Publish event to the 'status' topic + in Mappings::Metadata then false + # Update persistent 'metadata' topic (includes deleting) + in Mappings::Status then true + end + + Retriable.retry(on: IO::Error | MQTT::Error, on_retry: ->(e : Exception, _attempt : Int32, _elapsed : Time::Span, _next : Time::Span) { + Log.error(exception: e) { "MQTT connection error, reconnecting..." } + new_client + }) do + client.publish(topic: key, payload: payload, retain: retain) end end rescue e @@ -122,7 +136,10 @@ module PlaceOS::Source scope_value = hierarchy_values.first? # Prevent publishing events with unspecified top-level scope - return if scope_value.nil? || scope_value == "_" + if scope_value.nil? || scope_value == "_" + Log.debug { "#{data.status} event for #{data.module_id} ignored due to missing top-level scope" } + return + end # Construct key path dependent on Zone hierarchy subhierarchy = File.join(hierarchy_values[1..]) diff --git a/src/source/publishing/publisher.cr b/src/source/publishing/publisher.cr index 38129c4..9b83d0b 100644 --- a/src/source/publishing/publisher.cr +++ b/src/source/publishing/publisher.cr @@ -4,9 +4,11 @@ module PlaceOS::Source abstract class Publisher Log = ::Log.for(self) - record Message, + record( + Message, data : Mappings::Data, payload : String? + ) getter message_queue : Channel(Message) = Channel(Message).new From 8f9fea2c56c7a41993712617b8d95757a7658b54 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Wed, 19 Jan 2022 16:40:49 +1100 Subject: [PATCH 05/11] test(manager): test multiple managers and publication --- spec/manager_spec.cr | 46 +++++++++++++++++++++ spec/publishing/mqtt_broker_manager_spec.cr | 10 +---- spec/spec_helper.cr | 4 ++ spec/status_events_spec.cr | 3 +- 4 files changed, 52 insertions(+), 11 deletions(-) create mode 100644 spec/manager_spec.cr diff --git a/spec/manager_spec.cr b/spec/manager_spec.cr new file mode 100644 index 0000000..9158f6f --- /dev/null +++ b/spec/manager_spec.cr @@ -0,0 +1,46 @@ +module PlaceOS::Source + describe Manager do + it "provides concurrent publication to several stores" do + publisher_managers = [] of PublisherManager + + publisher_managers << MqttBrokerManager.new + + influx_host, influx_api_key = INFLUX_HOST.not_nil!, INFLUX_API_KEY.not_nil! + + publisher_managers << InfluxManager.new(influx_host, influx_api_key) + + mock_publisher = MockManager.new + + publisher_managers << mock_publisher + + # Mock data + module_id = "mod-hello_hello" + status_key = "power" + mock_mappings_state = mock_state(module_id: module_id) + mock_mappings = Mappings.new(mock_mappings_state) + + # Start application manager + manager = Manager.new(publisher_managers, mock_mappings) + manager.start + + sleep 20.milliseconds + + Redis.open(url: REDIS_URL) do |client| + client.publish("status/#{module_id}/#{status_key}", "on".to_json) + end + + message = begin + Retriable.retry(max_attempts: 5, base_interval: 20.milliseconds) do + # Wait for a message to be published + mock_publisher.messages.first?.tap { |m| raise "retry" if m.nil? } + end + rescue + nil + end + + message.should_not be_nil + + manager.stop + end + end +end diff --git a/spec/publishing/mqtt_broker_manager_spec.cr b/spec/publishing/mqtt_broker_manager_spec.cr index 4e86f64..816bd84 100644 --- a/spec/publishing/mqtt_broker_manager_spec.cr +++ b/spec/publishing/mqtt_broker_manager_spec.cr @@ -5,7 +5,7 @@ module PlaceOS::Source it "creates MQTT publishing clients" do model = test_broker - event = Resource::Event.new(:created, model) + event = Resource::Event(PlaceOS::Model::Broker).new(:created, model) publisher_manager = MqttBrokerManager.new publisher_manager.@event_channel.send(event) publisher_manager.start @@ -18,13 +18,5 @@ module PlaceOS::Source publisher = publisher_manager.@publishers[model.id.as(String)]? publisher.should_not be_nil end - - it "publishes events" do - # Create the publisher manager - # Add an MQTT publisher - # Add an influx publisher - # Ensure both publishers are present - # Check that the event is published - end end end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index c131199..b6281fb 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -7,6 +7,7 @@ require "../src/placeos-source" require "../src/source/*" Spec.before_suite do + PlaceOS::Model::Broker.clear ::Log.setup "*", :debug, PlaceOS::LogBackend.log_backend end @@ -15,6 +16,9 @@ def expected_payload(value) end def test_broker + existing = PlaceOS::Model::Broker.where(name: "mosquitto").first? + return existing if existing + PlaceOS::Model::Broker.new( name: "mosquitto", host: ENV["MQTT_HOST"]?.presence || "localhost", diff --git a/spec/status_events_spec.cr b/spec/status_events_spec.cr index 0393f91..5a65a4c 100644 --- a/spec/status_events_spec.cr +++ b/spec/status_events_spec.cr @@ -17,7 +17,7 @@ module PlaceOS::Source sleep 0.1 Redis.open(url: REDIS_URL) do |client| - client.publish("status/#{module_id}/#{status_key}", "on".to_json) + client.publish("status/#{module_id}/#{status_key}", expected_payload("on")) end sleep 0.1 @@ -30,7 +30,6 @@ module PlaceOS::Source key.should eq "placeos/org-donor/state/cards/nek/2042/cs-9445/12345/M'Odule/1/#{status_key}" message.payload.should eq expected_payload("on") - events.stop end end From f3af7ab5f46e07d03efa2fe3448910c2cf5ac065 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Thu, 20 Jan 2022 14:05:02 +1100 Subject: [PATCH 06/11] feat(mqtt_publisher): ping every 1/3 of keepalive window --- shard.lock | 6 +----- shard.yml | 9 +++++---- spec/manager_spec.cr | 2 ++ src/source/publishing/influx_publisher.cr | 1 - src/source/publishing/mqtt_publisher.cr | 17 +++++++++++++++++ src/source/status_events.cr | 6 +++--- 6 files changed, 28 insertions(+), 13 deletions(-) diff --git a/shard.lock b/shard.lock index 9442975..c6efaeb 100644 --- a/shard.lock +++ b/shard.lock @@ -86,7 +86,7 @@ shards: placeos-resource: git: https://github.com/place-labs/resource.git - version: 2.2.0 + version: 2.3.0 pool: git: https://github.com/ysbaddaden/pool.git @@ -120,10 +120,6 @@ shards: git: https://github.com/place-labs/secrets-env.git version: 1.3.1 - simple_retry: - git: https://github.com/spider-gazelle/simple_retry.git - version: 1.1.1 - tasker: git: https://github.com/spider-gazelle/tasker.git version: 2.0.5 diff --git a/shard.yml b/shard.yml index 3ab81d3..24efd38 100644 --- a/shard.yml +++ b/shard.yml @@ -29,7 +29,7 @@ dependencies: placeos-resource: github: place-labs/resource - version: ~> 2 + version: ~> 2.0 redis: github: stefanwille/crystal-redis @@ -48,10 +48,11 @@ dependencies: secrets-env: github: place-labs/secrets-env - version: ~> 1 + version: ~> 1.0 - simple_retry: - github: spider-gazelle/simple_retry + tasker: + github: spider-gazelle/tasker + version: ~> 2.0 development_dependencies: faker: diff --git a/spec/manager_spec.cr b/spec/manager_spec.cr index 9158f6f..33f2d40 100644 --- a/spec/manager_spec.cr +++ b/spec/manager_spec.cr @@ -1,3 +1,5 @@ +require "./spec_helper" + module PlaceOS::Source describe Manager do it "provides concurrent publication to several stores" do diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index 992f309..9a15397 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -2,7 +2,6 @@ require "flux" require "mqtt" require "openssl" require "random" -require "simple_retry" require "time" require "./publisher" diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 70df148..406a58e 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -4,6 +4,7 @@ require "mqtt/v3/client" require "placeos-models/broker" require "retriable" require "rwlock" +require "tasker" require "./publisher" @@ -75,12 +76,28 @@ module PlaceOS::Source # Establish a MQTT connection client = ::MQTT::V3::Client.new(transport) + keep_alive = 60 + client.connect( client_id: broker.id.as(String), + keep_alive: keep_alive, username: broker.username, password: broker.password, ) + close_channel = Channel(Nil).new(1) + + repeating_task = Tasker.every((keep_alive // 3).seconds) do + close_channel.close if client.closed? + end + + # Spawn a helper fiber to cancel the repeating ping task + spawn do + # Block waiting for close event + close_channel.receive? + repeating_task.cancel + end + client end diff --git a/src/source/status_events.cr b/src/source/status_events.cr index 7eccc17..2d47e2d 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -1,5 +1,5 @@ require "redis" -require "simple_retry" +require "retriable" require "./mappings" require "./publishing/publisher" @@ -23,10 +23,10 @@ module PlaceOS::Source def start self.stopped = false - SimpleRetry.try_to( + Retriable.retry( base_interval: 1.second, max_interval: 5.seconds, - randomise: 500.milliseconds + rand_factor: 0.5 ) do begin redis.psubscribe(STATUS_CHANNEL_PATTERN) do |callbacks| From 1f24c400d18e18f605701a8529b5386b754ac1f3 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 21 Jan 2022 12:46:59 +1100 Subject: [PATCH 07/11] chore(control_system_router): update to `Model.find_all` --- src/source/router/control_system_router.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/source/router/control_system_router.cr b/src/source/router/control_system_router.cr index 521bfa4..52ccae4 100644 --- a/src/source/router/control_system_router.cr +++ b/src/source/router/control_system_router.cr @@ -146,13 +146,13 @@ module PlaceOS::Source::Router # Get zones protected def self._system_zones(control_system : PlaceOS::Model::ControlSystem) zone_list = control_system.zones || [] of String - PlaceOS::Model::Zone.get_all(zone_list) + PlaceOS::Model::Zone.find_all(zone_list) end # Get modules protected def self._system_modules(control_system : PlaceOS::Model::ControlSystem) module_ids = control_system.modules || [] of String - PlaceOS::Model::Module.get_all(module_ids) + PlaceOS::Model::Module.find_all(module_ids) end end end From 5af7b367ec8f1536df38be9b645dfa2e9f3de40b Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 21 Jan 2022 13:03:22 +1100 Subject: [PATCH 08/11] build(dockerfile): update to 1.3.2 --- Dockerfile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index b96da36..731bf3e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -ARG CRYSTAL_VERSION=1.3.1 +ARG CRYSTAL_VERSION=1.3.2 FROM crystallang/crystal:${CRYSTAL_VERSION}-alpine as build WORKDIR /app @@ -36,7 +36,11 @@ COPY src /app/src RUN UNAME_AT_COMPILE_TIME=true \ PLACE_COMMIT=$PLACE_COMMIT \ PLACE_VERSION=$PLACE_VERSION \ - crystal build --error-trace --release /app/src/app.cr -o /app/source + crystal build \ + --error-trace \ + --release \ + -o /app/source \ + /app/src/app.cr SHELL ["/bin/ash", "-eo", "pipefail", "-c"] From 644a595f64569b0a76df992236159f225d783c5a Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 21 Jan 2022 13:46:24 +1100 Subject: [PATCH 09/11] revert: add pull to `generate-secrets` --- scripts/generate-secrets | 2 +- spec/manager_spec.cr | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/generate-secrets b/scripts/generate-secrets index 914388c..164a68a 100755 --- a/scripts/generate-secrets +++ b/scripts/generate-secrets @@ -2,7 +2,7 @@ image="placeos/init:nightly" -# docker pull "${image}" +docker pull "${image}" docker run --rm \ -e PLACE_EMAIL=${PLACE_EMAIL:-support@place.tech} \ diff --git a/spec/manager_spec.cr b/spec/manager_spec.cr index 33f2d40..881f5c6 100644 --- a/spec/manager_spec.cr +++ b/spec/manager_spec.cr @@ -25,7 +25,7 @@ module PlaceOS::Source manager = Manager.new(publisher_managers, mock_mappings) manager.start - sleep 20.milliseconds + sleep 50.milliseconds Redis.open(url: REDIS_URL) do |client| client.publish("status/#{module_id}/#{status_key}", "on".to_json) From 44cc985ae20b0877c305df02dd8ff16053620bc7 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 21 Jan 2022 16:32:52 +1100 Subject: [PATCH 10/11] feat: more logging, unblock publishing --- spec/manager_spec.cr | 5 +++++ spec/spec_helper.cr | 2 +- src/source/manager.cr | 2 +- src/source/publishing/influx_publisher.cr | 2 +- src/source/publishing/mqtt_publisher.cr | 4 +++- src/source/status_events.cr | 11 ++++++----- 6 files changed, 17 insertions(+), 9 deletions(-) diff --git a/spec/manager_spec.cr b/spec/manager_spec.cr index 881f5c6..2c67842 100644 --- a/spec/manager_spec.cr +++ b/spec/manager_spec.cr @@ -3,6 +3,9 @@ require "./spec_helper" module PlaceOS::Source describe Manager do it "provides concurrent publication to several stores" do + # Create a test broker + test_broker + publisher_managers = [] of PublisherManager publisher_managers << MqttBrokerManager.new @@ -31,6 +34,8 @@ module PlaceOS::Source client.publish("status/#{module_id}/#{status_key}", "on".to_json) end + sleep 50.milliseconds + message = begin Retriable.retry(max_attempts: 5, base_interval: 20.milliseconds) do # Wait for a message to be published diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index b6281fb..7f6f2d2 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -8,7 +8,7 @@ require "../src/source/*" Spec.before_suite do PlaceOS::Model::Broker.clear - ::Log.setup "*", :debug, PlaceOS::LogBackend.log_backend + ::Log.setup "*", :trace, PlaceOS::LogBackend.log_backend end def expected_payload(value) diff --git a/src/source/manager.cr b/src/source/manager.cr index d0a2f3f..ae492f3 100644 --- a/src/source/manager.cr +++ b/src/source/manager.cr @@ -35,7 +35,7 @@ module PlaceOS::Source @started = true Log.info { "registering Publishers" } - publisher_managers.each &.start + publisher_managers.each(&.start) # Acquire the Zones (hierarchy) first Log.info { "starting Zone router" } diff --git a/src/source/publishing/influx_publisher.cr b/src/source/publishing/influx_publisher.cr index 9a15397..11a5a26 100644 --- a/src/source/publishing/influx_publisher.cr +++ b/src/source/publishing/influx_publisher.cr @@ -44,7 +44,7 @@ module PlaceOS::Source def publish(message : Publisher::Message) points = self.class.transform(message) points.each do |point| - Log.debug { { + Log.trace { { measurement: point.measurement, timestamp: point.timestamp.to_s, tags: point.tags.to_json, diff --git a/src/source/publishing/mqtt_publisher.cr b/src/source/publishing/mqtt_publisher.cr index 406a58e..2b1f36f 100644 --- a/src/source/publishing/mqtt_publisher.cr +++ b/src/source/publishing/mqtt_publisher.cr @@ -115,7 +115,9 @@ module PlaceOS::Source in Mappings::Status then true end - Retriable.retry(on: IO::Error | MQTT::Error, on_retry: ->(e : Exception, _attempt : Int32, _elapsed : Time::Span, _next : Time::Span) { + Log.trace { {message: "writing to MQTT", key: key, retain: retain} } + + Retriable.retry(max_attempts: 20, on: IO::Error | MQTT::Error, on_retry: ->(e : Exception, _attempt : Int32, _elapsed : Time::Span, _next : Time::Span) { Log.error(exception: e) { "MQTT connection error, reconnecting..." } new_client }) do diff --git a/src/source/status_events.cr b/src/source/status_events.cr index 2d47e2d..a8f7f01 100644 --- a/src/source/status_events.cr +++ b/src/source/status_events.cr @@ -24,7 +24,7 @@ module PlaceOS::Source self.stopped = false Retriable.retry( - base_interval: 1.second, + base_interval: 500.milliseconds, max_interval: 5.seconds, rand_factor: 0.5 ) do @@ -55,10 +55,11 @@ module PlaceOS::Source channel: channel, } } - if events - events.each do |event| - message = Publisher::Message.new(event, payload) - publisher_managers.each &.broadcast(message) + events.try &.each do |event| + message = Publisher::Message.new(event, payload) + publisher_managers.each do |manager| + Log.trace { "broadcasting message to #{manager.class}" } + spawn { manager.broadcast(message) } end end end From dd7d5f67979845c7a94e21744459cf0fdaa12593 Mon Sep 17 00:00:00 2001 From: Caspian Baska Date: Fri, 21 Jan 2022 16:37:33 +1100 Subject: [PATCH 11/11] revert: pull images in `test` --- test | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test b/test index b21b742..e658db6 100755 --- a/test +++ b/test @@ -15,7 +15,7 @@ trap "trap_ctrlc" 2 ./scripts/generate-secrets -# docker-compose pull -q +docker-compose pull -q docker-compose up --no-deps --detach influxdb