buffet: Switch XmppChannel to use asynchronous socket streams
Changed XmppClient from using raw socket file descriptors to
chromeos::Stream in preparation of adding TLS support (via subsituting
the regular socket-based FileStream with TlsStream once TLS handshake
is initiated by the XMPP server).
Also implemented exponential backoff for reconnecting to the server
in case of a network error as well as more comprehensive network
read/write mechanism which will tie better into more strict XML-based
parser/communication and renamed XmppClient to XmppChannel after
adding a generic interface for NotificationChannel and NotificationDelegate
which will help us implement other notification channels such as
GCM and periodic polling.
BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
Tested XMPP operation on DUT.
Change-Id: I88d593692ca56d03356155e12cde2f2942f8391e
Reviewed-on: https://chromium-review.googlesource.com/271151
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Nathan Bullock <nathanbullock@google.com>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Alex Vakulenko <avakulenko@chromium.org>
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 2fd2bfb..e9f806b 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -27,6 +27,7 @@
#include "buffet/commands/command_manager.h"
#include "buffet/commands/schema_constants.h"
#include "buffet/device_registration_storage_keys.h"
+#include "buffet/notification/xmpp_channel.h"
#include "buffet/org.chromium.Buffet.Manager.h"
#include "buffet/states/state_manager.h"
#include "buffet/utils.h"
@@ -133,14 +134,14 @@
std::unique_ptr<BuffetConfig> config,
const std::shared_ptr<chromeos::http::Transport>& transport,
const std::shared_ptr<StorageInterface>& state_store,
- bool xmpp_enabled,
+ bool notifications_enabled,
org::chromium::Buffet::ManagerAdaptor* manager)
: transport_{transport},
storage_{state_store},
command_manager_{command_manager},
state_manager_{state_manager},
config_{std::move(config)},
- xmpp_enabled_{xmpp_enabled},
+ notifications_enabled_{notifications_enabled},
manager_{manager} {
OnConfigChanged();
command_manager_->AddOnCommandDefChanged(
@@ -364,14 +365,14 @@
LOG(INFO) << "Access token is refreshed for additional " << expires_in
<< " seconds.";
- StartXmpp();
+ StartNotificationChannel();
return true;
}
-void DeviceRegistrationInfo::StartXmpp() {
- if (!xmpp_enabled_) {
- LOG(WARNING) << "XMPP support disabled by flag.";
+void DeviceRegistrationInfo::StartNotificationChannel() {
+ if (!notifications_enabled_) {
+ LOG(WARNING) << "Notification support disabled by flag.";
return;
}
// If no MessageLoop assume we're in unittests.
@@ -380,37 +381,13 @@
return;
}
- if (!fd_watcher_.StopWatchingFileDescriptor()) {
- LOG(WARNING) << "Failed to stop the previous watcher";
- return;
- }
-
- std::unique_ptr<XmppConnection> connection(new XmppConnection());
- if (!connection->Initialize()) {
- LOG(WARNING) << "Failed to connect to XMPP server";
- return;
- }
- xmpp_client_.reset(new XmppClient(device_robot_account_, access_token_,
- std::move(connection)));
- if (!base::MessageLoopForIO::current()->WatchFileDescriptor(
- xmpp_client_->GetFileDescriptor(), true /* persistent */,
- base::MessageLoopForIO::WATCH_READ, &fd_watcher_, this)) {
- LOG(WARNING) << "Failed to watch XMPP file descriptor";
- return;
- }
-
- xmpp_client_->StartStream();
-}
-
-void DeviceRegistrationInfo::OnFileCanReadWithoutBlocking(int fd) {
- if (!xmpp_client_ || xmpp_client_->GetFileDescriptor() != fd)
- return;
- if (!xmpp_client_->Read()) {
- // Authentication failed or the socket was closed.
- if (!fd_watcher_.StopWatchingFileDescriptor())
- LOG(WARNING) << "Failed to stop the watcher";
- return;
- }
+ // TODO(avakulenko): Move this into a notification channel factory and out of
+ // this class completely. Also to be added the secondary (poll) notification
+ // channel.
+ primary_notification_channel_.reset(
+ new XmppChannel{device_robot_account_, access_token_,
+ base::MessageLoop::current()->task_runner()});
+ primary_notification_channel_->Start(this);
}
std::unique_ptr<base::DictionaryValue>
@@ -437,7 +414,18 @@
resource->SetString("location", config_->location());
resource->SetString("modelManifestId", config_->model_id());
resource->SetString("deviceKind", config_->device_kind());
- resource->SetString("channel.supportedType", "xmpp");
+ std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue};
+ if (primary_notification_channel_) {
+ channel->SetString("supportedType",
+ primary_notification_channel_->GetName());
+ primary_notification_channel_->AddChannelParameters(channel.get());
+ } else {
+ // TODO(avakulenko): Currently GCD server doesn't support changing supported
+ // channel, so here we cannot use "pull" as supported channel type until
+ // this is fixed. See b/20895223
+ channel->SetString("supportedType", "xmpp");
+ }
+ resource->Set("channel", channel.release());
resource->Set("commandDefs", commands.release());
resource->Set("state", state.release());
@@ -587,7 +575,7 @@
base::TimeDelta::FromSeconds(expires_in);
Save();
- StartXmpp();
+ StartNotificationChannel();
// We're going to respond with our success immediately and we'll StartDevice
// shortly after.
@@ -1067,4 +1055,19 @@
base::Bind(&IgnoreCloudError));
}
+void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) {
+ LOG(INFO) << "Notification channel successfully established over "
+ << channel_name;
+ // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnDisconnected() {
+ LOG(INFO) << "Notification channel disconnected";
+ // TODO(avakulenko): Notify GCD server of changed supported channel.
+}
+
+void DeviceRegistrationInfo::OnPermanentFailure() {
+ LOG(ERROR) << "Failed to establish notification channel.";
+}
+
} // namespace buffet