buffet: Hook up XMPP to deliver push notifications to buffet

Now using XMPP not only for the device presence but for delivering
push notifications, specifically COMMAND_CREATED notification and
extracting the command instance from the notification message.
Add the commands received over XMPP to the command execution queue.

The remaining tasks for XMPP (making polling optional, provide
notification channel configuration options for buffet, update the
channel changes on GCD server, etc) are coming in follow-up CLs.

BUG=brillo:458
TEST=`FEATURES=test emerge-link buffet`
     Tested this on the device.

Change-Id: I6ba42e3687563133734aaf36d3802d6f4888f348
Reviewed-on: https://chromium-review.googlesource.com/272782
Trybot-Ready: Alex Vakulenko <avakulenko@chromium.org>
Tested-by: Alex Vakulenko <avakulenko@chromium.org>
Reviewed-by: Vitaly Buka <vitalybuka@chromium.org>
Commit-Queue: Vitaly Buka <vitalybuka@chromium.org>
diff --git a/buffet/buffet.gyp b/buffet/buffet.gyp
index 0d6fd92..ca4dca8 100644
--- a/buffet/buffet.gyp
+++ b/buffet/buffet.gyp
@@ -40,6 +40,7 @@
         'dbus_bindings/org.chromium.Buffet.Manager.xml',
         'dbus_constants.cc',
         'manager.cc',
+        'notification/notification_parser.cc',
         'notification/xml_node.cc',
         'notification/xmpp_channel.cc',
         'notification/xmpp_stream_parser.cc',
@@ -123,6 +124,7 @@
             'commands/schema_utils_unittest.cc',
             'commands/unittest_utils.cc',
             'device_registration_info_unittest.cc',
+            'notification/notification_parser_unittest.cc',
             'notification/xml_node_unittest.cc',
             'notification/xmpp_channel_unittest.cc',
             'notification/xmpp_stream_parser_unittest.cc',
diff --git a/buffet/device_registration_info.cc b/buffet/device_registration_info.cc
index 454853b..3435881 100644
--- a/buffet/device_registration_info.cc
+++ b/buffet/device_registration_info.cc
@@ -844,36 +844,38 @@
 }
 
 void DeviceRegistrationInfo::PublishCommands(const base::ListValue& commands) {
-  const CommandDictionary& command_dictionary =
-      command_manager_->GetCommandDictionary();
-
-  const size_t size{commands.GetSize()};
-  for (size_t i = 0; i < size; ++i) {
-    const base::DictionaryValue* command{nullptr};
-    if (!commands.GetDictionary(i, &command)) {
-      LOG(WARNING) << "No command resource at " << i;
+  for (const base::Value* command : commands) {
+    const base::DictionaryValue* command_dict{nullptr};
+    if (!command->GetAsDictionary(&command_dict)) {
+      LOG(WARNING) << "Not a command dictionary: " << *command;
       continue;
     }
+    PublishCommand(*command_dict);
+  }
+}
 
-    std::string command_id;
-    chromeos::ErrorPtr error;
-    auto command_instance = CommandInstance::FromJson(
-        command, commands::attributes::kCommand_Visibility_Cloud,
-        command_dictionary, &command_id, &error);
-    if (!command_instance) {
-      LOG(WARNING) << "Failed to parse a command with ID: " << command_id;
-      if (!command_id.empty())
-        NotifyCommandAborted(command_id, std::move(error));
-      continue;
-    }
+void DeviceRegistrationInfo::PublishCommand(
+    const base::DictionaryValue& command) {
+  std::string command_id;
+  chromeos::ErrorPtr error;
+  auto command_instance = CommandInstance::FromJson(
+      &command, commands::attributes::kCommand_Visibility_Cloud,
+      command_manager_->GetCommandDictionary(), &command_id, &error);
+  if (!command_instance) {
+    LOG(WARNING) << "Failed to parse a command instance: " << command;
+    if (!command_id.empty())
+      NotifyCommandAborted(command_id, std::move(error));
+    return;
+  }
 
-    // TODO(antonm): Properly process cancellation of commands.
-    if (!command_manager_->FindCommand(command_instance->GetID())) {
-      std::unique_ptr<CommandProxyInterface> cloud_proxy{
-          new CloudCommandProxy(command_instance.get(), this)};
-      command_instance->AddProxy(std::move(cloud_proxy));
-      command_manager_->AddCommand(std::move(command_instance));
-    }
+  // TODO(antonm): Properly process cancellation of commands.
+  if (!command_manager_->FindCommand(command_instance->GetID())) {
+    LOG(INFO) << "New command '" << command_instance->GetName()
+              << "' arrived, ID: " << command_instance->GetID();
+    std::unique_ptr<CommandProxyInterface> cloud_proxy{
+        new CloudCommandProxy(command_instance.get(), this)};
+    command_instance->AddProxy(std::move(cloud_proxy));
+    command_manager_->AddCommand(std::move(command_instance));
   }
 }
 
@@ -952,4 +954,17 @@
   LOG(ERROR) << "Failed to establish notification channel.";
 }
 
+void DeviceRegistrationInfo::OnCommandCreated(
+    const base::DictionaryValue& command) {
+  if (!command.empty()) {
+    // GCD spec indicates that the command parameter in notification object
+    // "may be empty if command size is too big".
+    PublishCommand(command);
+    return;
+  }
+  // TODO(avakulenko): If the command was too big to be delivered over a
+  // notification channel, perform a manual poll from the server here.
+}
+
+
 }  // namespace buffet
diff --git a/buffet/device_registration_info.h b/buffet/device_registration_info.h
index 5a9a1a3..7732da4 100644
--- a/buffet/device_registration_info.h
+++ b/buffet/device_registration_info.h
@@ -93,7 +93,7 @@
     const std::string& subpath = {},
     const chromeos::data_encoding::WebParamList& params = {}) const;
 
-  // Starts GCD device if credentials avalible.
+  // Starts GCD device if credentials available.
   void Start();
 
   // Checks whether we have credentials generated during registration.
@@ -199,6 +199,7 @@
   void PeriodicallyPollCommands();
 
   void PublishCommands(const base::ListValue& commands);
+  void PublishCommand(const base::DictionaryValue& command);
 
   void PublishStateUpdates();
 
@@ -228,6 +229,7 @@
   void OnConnected(const std::string& channel_name) override;
   void OnDisconnected() override;
   void OnPermanentFailure() override;
+  void OnCommandCreated(const base::DictionaryValue& command) override;
 
   // Transient data
   std::string access_token_;
diff --git a/buffet/notification/notification_delegate.h b/buffet/notification/notification_delegate.h
index 06b30d4..b2d7184 100644
--- a/buffet/notification/notification_delegate.h
+++ b/buffet/notification/notification_delegate.h
@@ -5,8 +5,11 @@
 #ifndef BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
 #define BUFFET_NOTIFICATION_NOTIFICATION_DELEGATE_H_
 
+#include <memory>
 #include <string>
 
+#include <base/values.h>
+
 namespace buffet {
 
 class NotificationDelegate {
@@ -14,6 +17,8 @@
   virtual void OnConnected(const std::string& channel_name) = 0;
   virtual void OnDisconnected() = 0;
   virtual void OnPermanentFailure() = 0;
+  // Called when a new command is sent via the notification channel.
+  virtual void OnCommandCreated(const base::DictionaryValue& command) = 0;
 
  protected:
   virtual ~NotificationDelegate() = default;
diff --git a/buffet/notification/notification_parser.cc b/buffet/notification/notification_parser.cc
new file mode 100644
index 0000000..5885afa
--- /dev/null
+++ b/buffet/notification/notification_parser.cc
@@ -0,0 +1,55 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/notification_parser.h"
+
+#include <base/logging.h>
+
+namespace buffet {
+
+namespace {
+
+// Processes COMMAND_CREATED notifications.
+bool ParseCommandCreated(const base::DictionaryValue& notification,
+                         NotificationDelegate* delegate) {
+  const base::DictionaryValue* command = nullptr;
+  if (!notification.GetDictionary("command", &command)) {
+    LOG(ERROR) << "COMMAND_CREATED notification is missing 'command' property";
+    return false;
+  }
+
+  delegate->OnCommandCreated(*command);
+  return true;
+}
+
+}  // anonymous namespace
+
+bool ParseNotificationJson(const base::DictionaryValue& notification,
+                           NotificationDelegate* delegate) {
+  CHECK(delegate);
+
+  std::string kind;
+  if (!notification.GetString("kind", &kind) ||
+      kind != "clouddevices#notification") {
+    LOG(WARNING) << "Push notification should have 'kind' property set to "
+                    "clouddevices#notification";
+    return false;
+  }
+
+  std::string type;
+  if (!notification.GetString("type", &type)) {
+    LOG(WARNING) << "Push notification should have 'type' property";
+    return false;
+  }
+
+  if (type == "COMMAND_CREATED")
+    return ParseCommandCreated(notification, delegate);
+
+  // Here we ignore other types of notifications for now.
+  LOG(INFO) << "Ignoring push notification of type " << type;
+  return true;
+}
+
+
+}  // namespace buffet
diff --git a/buffet/notification/notification_parser.h b/buffet/notification/notification_parser.h
new file mode 100644
index 0000000..eb50dc1
--- /dev/null
+++ b/buffet/notification/notification_parser.h
@@ -0,0 +1,24 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef BUFFET_NOTIFICATION_NOTIFICATION_PARSER_H_
+#define BUFFET_NOTIFICATION_NOTIFICATION_PARSER_H_
+
+#include <string>
+
+#include <base/values.h>
+
+#include "buffet/notification/notification_delegate.h"
+
+namespace buffet {
+
+// Parses the notification JSON object received from GCD server and invokes
+// the appropriate method from the |delegate|.
+// Returns false if unexpected or malformed notification is received.
+bool ParseNotificationJson(const base::DictionaryValue& notification,
+                           NotificationDelegate* delegate);
+
+}  // namespace buffet
+
+#endif  // BUFFET_NOTIFICATION_NOTIFICATION_PARSER_H_
diff --git a/buffet/notification/notification_parser_unittest.cc b/buffet/notification/notification_parser_unittest.cc
new file mode 100644
index 0000000..c6be507
--- /dev/null
+++ b/buffet/notification/notification_parser_unittest.cc
@@ -0,0 +1,142 @@
+// Copyright 2015 The Chromium OS Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "buffet/notification/notification_parser.h"
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "buffet/commands/unittest_utils.h"
+
+using testing::Invoke;
+using testing::_;
+
+namespace buffet {
+
+using unittests::CreateDictionaryValue;
+
+class MockNotificationDelegate : public NotificationDelegate {
+ public:
+  MOCK_METHOD1(OnConnected, void(const std::string&));
+  MOCK_METHOD0(OnDisconnected, void());
+  MOCK_METHOD0(OnPermanentFailure, void());
+  MOCK_METHOD1(OnCommandCreated, void(const base::DictionaryValue& command));
+};
+
+class NotificationParserTest : public ::testing::Test {
+ protected:
+  testing::StrictMock<MockNotificationDelegate> delegate_;
+};
+
+TEST_F(NotificationParserTest, CommandCreated) {
+  auto json = CreateDictionaryValue(R"({
+    "kind": "clouddevices#notification",
+    "type": "COMMAND_CREATED",
+    "deviceId": "device_id",
+    "command": {
+      "kind": "clouddevices#command",
+      "deviceId": "device_id",
+      "state": "queued",
+      "name": "storage.list",
+      "parameters": {
+        "path": "/somepath1"
+      },
+      "expirationTimeMs": "1406036174811",
+      "id": "command_id",
+      "creationTimeMs": "1403444174811"
+    },
+    "commandId": "command_id"
+  })");
+
+  base::DictionaryValue command_instance;
+  auto on_command = [&command_instance](const base::DictionaryValue& command) {
+    command_instance.MergeDictionary(&command);
+  };
+
+  EXPECT_CALL(delegate_, OnCommandCreated(_)).WillOnce(Invoke(on_command));
+  EXPECT_TRUE(ParseNotificationJson(*json, &delegate_));
+
+  const char expected_json[] = R"({
+      "kind": "clouddevices#command",
+      "deviceId": "device_id",
+      "state": "queued",
+      "name": "storage.list",
+      "parameters": {
+        "path": "/somepath1"
+      },
+      "expirationTimeMs": "1406036174811",
+      "id": "command_id",
+      "creationTimeMs": "1403444174811"
+    })";
+  EXPECT_JSON_EQ(expected_json, command_instance);
+}
+
+TEST_F(NotificationParserTest, Failure_NoKind) {
+  auto json = CreateDictionaryValue(R"({
+    "type": "COMMAND_CREATED",
+    "deviceId": "device_id",
+    "command": {
+      "kind": "clouddevices#command",
+      "deviceId": "device_id",
+      "state": "queued",
+      "name": "storage.list",
+      "parameters": {
+        "path": "/somepath1"
+      },
+      "expirationTimeMs": "1406036174811",
+      "id": "command_id",
+      "creationTimeMs": "1403444174811"
+    },
+    "commandId": "command_id"
+  })");
+
+  EXPECT_FALSE(ParseNotificationJson(*json, &delegate_));
+}
+
+TEST_F(NotificationParserTest, Failure_NoType) {
+  auto json = CreateDictionaryValue(R"({
+    "kind": "clouddevices#notification",
+    "deviceId": "device_id",
+    "command": {
+      "kind": "clouddevices#command",
+      "deviceId": "device_id",
+      "state": "queued",
+      "name": "storage.list",
+      "parameters": {
+        "path": "/somepath1"
+      },
+      "expirationTimeMs": "1406036174811",
+      "id": "command_id",
+      "creationTimeMs": "1403444174811"
+    },
+    "commandId": "command_id"
+  })");
+
+  EXPECT_FALSE(ParseNotificationJson(*json, &delegate_));
+}
+
+TEST_F(NotificationParserTest, IgnoredNotificationType) {
+  auto json = CreateDictionaryValue(R"({
+    "kind": "clouddevices#notification",
+    "type": "COMMAND_EXPIRED",
+    "deviceId": "device_id",
+    "command": {
+      "kind": "clouddevices#command",
+      "deviceId": "device_id",
+      "state": "queued",
+      "name": "storage.list",
+      "parameters": {
+        "path": "/somepath1"
+      },
+      "expirationTimeMs": "1406036174811",
+      "id": "command_id",
+      "creationTimeMs": "1403444174811"
+    },
+    "commandId": "command_id"
+  })");
+
+  EXPECT_TRUE(ParseNotificationJson(*json, &delegate_));
+}
+
+}  // namespace buffet
diff --git a/buffet/notification/xmpp_channel.cc b/buffet/notification/xmpp_channel.cc
index 3674184..75c8004 100644
--- a/buffet/notification/xmpp_channel.cc
+++ b/buffet/notification/xmpp_channel.cc
@@ -13,6 +13,7 @@
 #include <chromeos/streams/tls_stream.h>
 
 #include "buffet/notification/notification_delegate.h"
+#include "buffet/notification/notification_parser.h"
 #include "buffet/notification/xml_node.h"
 #include "buffet/utils.h"
 
@@ -213,6 +214,10 @@
       }
       break;
     default:
+      if (stanza->name() == "message") {
+        HandleMessageStanza(std::move(stanza));
+        return;
+      }
       LOG(INFO) << "Unexpected XMPP stanza ignored: " << stanza->ToString();
       return;
   }
@@ -222,6 +227,25 @@
   SendMessage("</stream:stream>");
 }
 
+void XmppChannel::HandleMessageStanza(std::unique_ptr<XmlNode> stanza) {
+  const XmlNode* node = stanza->FindFirstChild("push:push/push:data", true);
+  if (!node) {
+    LOG(WARNING) << "XMPP message stanza is missing <push:data> element";
+    return;
+  }
+  std::string data = node->text();
+  std::string json_data;
+  if (!chromeos::data_encoding::Base64Decode(data, &json_data)) {
+    LOG(WARNING) << "Failed to decode base64-encoded message payload: " << data;
+    return;
+  }
+
+  VLOG(2) << "XMPP push notification data: " << json_data;
+  auto json_dict = LoadJsonDict(json_data, nullptr);
+  if (json_dict && delegate_)
+    ParseNotificationJson(*json_dict, delegate_);
+}
+
 void XmppChannel::StartTlsHandshake() {
   stream_->CancelPendingAsyncOperations();
   chromeos::TlsStream::Connect(
diff --git a/buffet/notification/xmpp_channel.h b/buffet/notification/xmpp_channel.h
index 4cf540e..8db127e 100644
--- a/buffet/notification/xmpp_channel.h
+++ b/buffet/notification/xmpp_channel.h
@@ -71,6 +71,7 @@
   void OnStanza(std::unique_ptr<XmlNode> stanza) override;
 
   void HandleStanza(std::unique_ptr<XmlNode> stanza);
+  void HandleMessageStanza(std::unique_ptr<XmlNode> stanza);
   void RestartXmppStream();
 
   void StartTlsHandshake();