浏览代码

Prevent progress update spam

Do not send percentage step updates over the wire if the
percentage does not change significantly.
Gabriel Adrian Samfira 4 年之前
父节点
当前提交
94f6c498c6
共有 2 个文件被更改,包括 44 次插入9 次删除
  1. 33 5
      coriolis/events.py
  2. 11 4
      coriolis/providers/backup_writers.py

+ 33 - 5
coriolis/events.py

@@ -3,6 +3,7 @@
 
 import abc
 import collections
+import copy
 
 from oslo_log import log as logging
 from six import with_metaclass
@@ -13,13 +14,14 @@ from coriolis import constants
 LOG = logging.getLogger(__name__)
 
 _PercStepData = collections.namedtuple(
-    "_PercStepData", "progress_update_id last_value total_steps")
+    "_PercStepData", "progress_update_id last_perc last_value total_steps")
 
 
 class EventManager(object, with_metaclass(abc.ABCMeta)):
 
     def __init__(self, event_handler):
         self._event_handler = event_handler
+        self._perc_steps = {}
 
     def _call_event_handler(self, method_name, *args, **kwargs):
         if self._event_handler:
@@ -50,12 +52,38 @@ class EventManager(object, with_metaclass(abc.ABCMeta)):
             self._call_event_handler(
                 'get_progress_update_identifier', progress_update))
 
-        return _PercStepData(progress_update_id, initial_step, total_steps)
+        perc = 0
+        if initial_step > 0 and total_steps > 0:
+            perc = int(initial_step * 100 // total_steps)
+        self._perc_steps[progress_update_id] = _PercStepData(
+                progress_update_id, perc, initial_step, total_steps)
+
+        return self._perc_steps[progress_update_id]
 
     def set_percentage_step(self, step, new_current_step):
-        self._call_event_handler(
-            'update_progress_update', step.progress_update_id,
-            new_current_step)
+        perc_step = self._perc_steps.get(
+                step.progress_update_id, None)
+        if perc_step is None:
+            return
+
+        if perc_step.last_value > new_current_step:
+            LOG.warn("rollback for perc update %s not allowed" % step.progress_update_id)
+            return
+
+        perc = 0
+        if perc_step.total_steps > 0 and new_current_step > 0:
+            perc = int(new_current_step * 100 // perc_step.total_steps)
+
+        if self._call_event_handler and perc > perc_step.last_perc:
+            self._call_event_handler(
+                'update_progress_update', step.progress_update_id,
+                new_current_step)
+            perc_id = copy.copy(step.progress_update_id)
+            total_steps = perc_step.total_steps
+            del self._perc_steps[step.progress_update_id]
+            del perc_step
+            self._perc_steps[perc_id] = _PercStepData(
+                perc_id, perc, 0, total_steps)
 
     def progress_update(self, message):
         self._call_event_handler(

+ 11 - 4
coriolis/providers/backup_writers.py

@@ -3,6 +3,7 @@
 
 import abc
 import contextlib
+import copy
 import datetime
 import errno
 import os
@@ -328,6 +329,7 @@ class SSHBackupWriterImpl(BaseBackupWriterImpl):
                 raise
             finally:
                 self._sender_q.task_done()
+                del data
 
     def _encoder(self):
         while True:
@@ -643,18 +645,20 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
     def _sender(self):
         while True:
             payload = self._sender_q.get()
+            offset = copy.copy(payload["offset"])
             headers = {
-                "X-Write-Offset": str(payload["offset"]),
-                "X-Client-Token": self._id,
+                "X-Write-Offset": str(offset),
+                "X-Client-Token": copy.copy(self._id),
             }
             if payload.get("encoding", None):
-                headers["content-encoding"] = payload["encoding"]
+                enc = copy.copy(payload["encoding"])
+                headers["content-encoding"] = enc
 
             @utils.retry_on_error()
             def send():
                 self._ensure_session()
                 resp = self._session.post(
-                    self._uri, headers=headers, data=payload["chunk"],
+                    self._uri, headers=headers, data=copy.copy(payload["chunk"]),
                     timeout=CONF.default_requests_timeout
                 )
                 LOG.debug(
@@ -677,6 +681,9 @@ class HTTPBackupWriterImpl(BaseBackupWriterImpl):
                 LOG.exception(err)
                 self._exception = err
                 raise
+            finally:
+                del headers
+                del payload
             self._sender_q.task_done()
 
     @utils.retry_on_error()