]> git.proxmox.com Git - proxmox-backup.git/commitdiff
use RateLimitConfig for HttpClient and pull
authorDietmar Maurer <dietmar@proxmox.com>
Mon, 22 Nov 2021 05:26:55 +0000 (06:26 +0100)
committerDietmar Maurer <dietmar@proxmox.com>
Mon, 22 Nov 2021 06:49:41 +0000 (07:49 +0100)
Signed-off-by: Dietmar Maurer <dietmar@proxmox.com>
pbs-client/src/http_client.rs
pbs-client/src/tools/mod.rs
proxmox-backup-client/src/main.rs
src/api2/config/remote.rs
src/api2/pull.rs
src/server/pull.rs

index 61f05f28df25923f8173085177f1e3a794d4a395..a91b5e005a1d8121aa4c24ccec8a639f2893b816 100644 (file)
@@ -24,7 +24,7 @@ use proxmox_http::client::{HttpsConnector, RateLimiter};
 use proxmox_http::uri::build_authority;
 use proxmox_async::broadcast_future::BroadcastFuture;
 
-use pbs_api_types::{Authid, Userid};
+use pbs_api_types::{Authid, Userid, RateLimitConfig};
 use pbs_tools::json::json_object_to_query;
 use pbs_tools::ticket;
 use pbs_tools::percent_encoding::DEFAULT_ENCODE_SET;
@@ -51,8 +51,7 @@ pub struct HttpClientOptions {
     ticket_cache: bool,
     fingerprint_cache: bool,
     verify_cert: bool,
-    rate_limit: Option<u64>,
-    bucket_size: Option<u64>,
+    limit: RateLimitConfig,
 }
 
 impl HttpClientOptions {
@@ -112,13 +111,8 @@ impl HttpClientOptions {
         self
     }
 
-    pub fn rate_limit(mut self, rate_limit:  Option<u64>) -> Self {
-        self.rate_limit = rate_limit;
-        self
-    }
-
-    pub fn bucket_size(mut self, bucket_size:  Option<u64>) -> Self {
-        self.bucket_size = bucket_size;
+    pub fn rate_limit(mut self, rate_limit: RateLimitConfig) -> Self {
+        self.limit = rate_limit;
         self
     }
 }
@@ -133,8 +127,7 @@ impl Default for HttpClientOptions {
             ticket_cache: false,
             fingerprint_cache: false,
             verify_cert: true,
-            rate_limit: None,
-            bucket_size: None,
+            limit: RateLimitConfig::default(), // unlimited
         }
     }
 }
@@ -359,10 +352,18 @@ impl HttpClient {
         httpc.set_connect_timeout(Some(std::time::Duration::new(10, 0)));
         let mut https = HttpsConnector::with_connector(httpc, ssl_connector_builder.build(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME);
 
-        if let Some(rate_limit) = options.rate_limit {
-            let bucket_size = options.bucket_size.unwrap_or_else(|| rate_limit*3);
-            https.set_read_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
-            https.set_write_limiter(Some(Arc::new(Mutex::new(RateLimiter::new(rate_limit, bucket_size)))));
+        if let Some(rate_in) = options.limit.rate_in {
+            let burst_in = options.limit.burst_in.unwrap_or_else(|| rate_in).as_u64();
+            https.set_read_limiter(Some(Arc::new(Mutex::new(
+                RateLimiter::new(rate_in.as_u64(), burst_in)
+            ))));
+        }
+
+        if let Some(rate_out) = options.limit.rate_out {
+            let burst_out = options.limit.burst_out.unwrap_or_else(|| rate_out).as_u64();
+            https.set_write_limiter(Some(Arc::new(Mutex::new(
+                RateLimiter::new(rate_out.as_u64(), burst_out)
+            ))));
         }
 
         let client = Client::builder()
index a38b1c87b832824bb7b3df05beeef06aa9a56a33..f7a253dff6bcf86fa3aa3058745d0ae09eaa9758 100644 (file)
@@ -14,7 +14,7 @@ use proxmox_schema::*;
 use proxmox_router::cli::{complete_file_name, shellword_split};
 use proxmox::tools::fs::file_get_json;
 
-use pbs_api_types::{BACKUP_REPO_URL, Authid, UserWithTokens};
+use pbs_api_types::{BACKUP_REPO_URL, Authid, RateLimitConfig, UserWithTokens};
 use pbs_datastore::BackupDir;
 use pbs_tools::json::json_object_to_query;
 
@@ -135,16 +135,16 @@ pub fn extract_repository_from_map(param: &HashMap<String, String>) -> Option<Ba
 }
 
 pub fn connect(repo: &BackupRepository) -> Result<HttpClient, Error> {
-    connect_do(repo.host(), repo.port(), repo.auth_id(), None, None)
+    let rate_limit = RateLimitConfig::default(); // unlimited
+    connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
         .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
 }
 
 pub fn connect_rate_limited(
     repo: &BackupRepository,
-    rate: Option<u64>,
-    bucket_size: Option<u64>,
+    rate_limit: RateLimitConfig,
 ) -> Result<HttpClient, Error> {
-    connect_do(repo.host(), repo.port(), repo.auth_id(), rate, bucket_size)
+    connect_do(repo.host(), repo.port(), repo.auth_id(), rate_limit)
         .map_err(|err| format_err!("error building client for repository {} - {}", repo, err))
 }
 
@@ -152,15 +152,13 @@ fn connect_do(
     server: &str,
     port: u16,
     auth_id: &Authid,
-    rate_limit: Option<u64>,
-    bucket_size: Option<u64>,
+    rate_limit: RateLimitConfig,
 ) -> Result<HttpClient, Error> {
     let fingerprint = std::env::var(ENV_VAR_PBS_FINGERPRINT).ok();
 
     let password = get_secret_from_env(ENV_VAR_PBS_PASSWORD)?;
     let options = HttpClientOptions::new_interactive(password, fingerprint)
-        .rate_limit(rate_limit)
-        .bucket_size(bucket_size);
+        .rate_limit(rate_limit);
 
     HttpClient::new(server, port, auth_id, options)
 }
index b4ad166a94f4ae00175f3c9c042b5d07bcc73080..199ab5820f2464084eb6ecb5f740606b36d5334a 100644 (file)
@@ -23,8 +23,9 @@ use pxar::accessor::{MaybeReady, ReadAt, ReadAtOperation};
 use pbs_api_types::{
     BACKUP_ID_SCHEMA, BACKUP_TIME_SCHEMA, BACKUP_TYPE_SCHEMA,
     TRAFFIC_CONTROL_BURST_SCHEMA, TRAFFIC_CONTROL_RATE_SCHEMA,
-    Authid, CryptMode, Fingerprint, GroupListItem, PruneListItem, PruneOptions,
-    SnapshotListItem, StorageStatus,
+    Authid, CryptMode, Fingerprint, GroupListItem, HumanByte,
+    PruneListItem, PruneOptions, RateLimitConfig, SnapshotListItem,
+    StorageStatus,
 };
 use pbs_client::{
     BACKUP_SOURCE_SCHEMA,
@@ -640,8 +641,16 @@ async fn create_backup(
         verify_chunk_size(size)?;
     }
 
-    let rate_limit = param["rate"].as_u64();
-    let bucket_size = param["burst"].as_u64();
+    let rate = match param["rate"].as_str() {
+        Some(s) => Some(s.parse::<HumanByte>()?),
+        None => None,
+    };
+    let burst = match param["burst"].as_str() {
+        Some(s) => Some(s.parse::<HumanByte>()?),
+        None => None,
+    };
+
+    let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
 
     let crypto = crypto_parameters(&param)?;
 
@@ -737,7 +746,7 @@ async fn create_backup(
 
     let backup_time = backup_time_opt.unwrap_or_else(epoch_i64);
 
-    let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
+    let client = connect_rate_limited(&repo, rate_limit)?;
     record_repository(&repo);
 
     println!("Starting backup: {}/{}/{}", backup_type, backup_id, BackupDir::backup_time_to_string(backup_time)?);
@@ -1092,10 +1101,18 @@ async fn restore(param: Value) -> Result<Value, Error> {
 
     let archive_name = json::required_string_param(&param, "archive-name")?;
 
-    let rate_limit = param["rate"].as_u64();
-    let bucket_size = param["burst"].as_u64();
+    let rate = match param["rate"].as_str() {
+        Some(s) => Some(s.parse::<HumanByte>()?),
+        None => None,
+    };
+    let burst = match param["burst"].as_str() {
+        Some(s) => Some(s.parse::<HumanByte>()?),
+        None => None,
+    };
+
+    let rate_limit = RateLimitConfig::with_same_inout(rate, burst);
 
-    let client = connect_rate_limited(&repo, rate_limit, bucket_size)?;
+    let client = connect_rate_limited(&repo, rate_limit)?;
     record_repository(&repo);
 
     let path = json::required_string_param(&param, "snapshot")?;
index 0e2413a9ab337434c0490377f9db79d8c414ec03..c43b30c44add7f8097dc046f24f5a3e1345f73bd 100644 (file)
@@ -12,7 +12,7 @@ use pbs_client::{HttpClient, HttpClientOptions};
 use pbs_api_types::{
     REMOTE_ID_SCHEMA, REMOTE_PASSWORD_SCHEMA, Remote, RemoteConfig, RemoteConfigUpdater,
     Authid, PROXMOX_CONFIG_DIGEST_SCHEMA, DATASTORE_SCHEMA, GroupListItem,
-    DataStoreListItem, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
+    DataStoreListItem, RateLimitConfig, SyncJobConfig, PRIV_REMOTE_AUDIT, PRIV_REMOTE_MODIFY,
 };
 use pbs_config::sync;
 
@@ -280,8 +280,15 @@ pub fn delete_remote(name: String, digest: Option<String>) -> Result<(), Error>
 }
 
 /// Helper to get client for remote.cfg entry
-pub async fn remote_client(remote: &Remote) -> Result<HttpClient, Error> {
-    let options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
+pub async fn remote_client(
+    remote: &Remote,
+    limit: Option<RateLimitConfig>,
+) -> Result<HttpClient, Error> {
+    let mut options = HttpClientOptions::new_non_interactive(remote.password.clone(), remote.config.fingerprint.clone());
+
+    if let Some(limit) = limit {
+        options = options.rate_limit(limit);
+    }
 
     let client = HttpClient::new(
         &remote.config.host,
@@ -325,7 +332,7 @@ pub async fn scan_remote_datastores(name: String) -> Result<Vec<DataStoreListIte
                   api_err)
     };
 
-    let client = remote_client(&remote)
+    let client = remote_client(&remote, None)
         .await
         .map_err(map_remote_err)?;
     let api_res = client
@@ -375,7 +382,7 @@ pub async fn scan_remote_groups(name: String, store: String) -> Result<Vec<Group
                   api_err)
     };
 
-    let client = remote_client(&remote)
+    let client = remote_client(&remote, None)
         .await
         .map_err(map_remote_err)?;
     let api_res = client
index 3a8160764185dec5d945e2f7862e658c742fb3d6..aaeed4dea0c98c5c4f877817816b93e626decc81 100644 (file)
@@ -9,7 +9,7 @@ use proxmox_router::{ApiMethod, Router, RpcEnvironment, Permission};
 use proxmox_sys::task_log;
 
 use pbs_api_types::{
-    Authid, SyncJobConfig, GroupFilter, GROUP_FILTER_LIST_SCHEMA,
+    Authid, SyncJobConfig, GroupFilter, RateLimitConfig, GROUP_FILTER_LIST_SCHEMA,
     DATASTORE_SCHEMA, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA,
     PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ,
 };
@@ -51,6 +51,7 @@ impl TryFrom<&SyncJobConfig> for PullParameters {
             sync_job.owner.as_ref().unwrap_or_else(|| Authid::root_auth_id()).clone(),
             sync_job.remove_vanished,
             sync_job.group_filter.clone(),
+            sync_job.limit.clone(),
         )
     }
 }
@@ -156,6 +157,10 @@ pub fn do_sync_job(
                 schema: GROUP_FILTER_LIST_SCHEMA,
                 optional: true,
             },
+            limit: {
+                type: RateLimitConfig,
+                flatten: true,
+            }
         },
     },
     access: {
@@ -174,6 +179,7 @@ async fn pull (
     remote_store: String,
     remove_vanished: Option<bool>,
     group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
     _info: &ApiMethod,
     rpcenv: &mut dyn RpcEnvironment,
 ) -> Result<String, Error> {
@@ -190,6 +196,7 @@ async fn pull (
         auth_id.clone(),
         remove_vanished,
         group_filter,
+        limit,
     )?;
     let client = pull_params.client().await?;
 
index 97eee1e6d24866ada223612adfdbf0a298ae6a60..acf2c265fee1c802cee810205a71d05b35029ee8 100644 (file)
@@ -14,7 +14,10 @@ use http::StatusCode;
 use proxmox_router::HttpError;
 use proxmox_sys::task_log;
 
-use pbs_api_types::{Authid, GroupFilter, GroupListItem, Remote, SnapshotListItem};
+use pbs_api_types::{
+    Authid, GroupFilter, GroupListItem, RateLimitConfig, Remote,
+    SnapshotListItem,
+};
 
 use pbs_datastore::{BackupDir, BackupInfo, BackupGroup, DataStore, StoreProgress};
 use pbs_datastore::data_blob::DataBlob;
@@ -41,6 +44,7 @@ pub struct PullParameters {
     owner: Authid,
     remove_vanished: bool,
     group_filter: Option<Vec<GroupFilter>>,
+    limit: RateLimitConfig,
 }
 
 impl PullParameters {
@@ -51,6 +55,7 @@ impl PullParameters {
         owner: Authid,
         remove_vanished: Option<bool>,
         group_filter: Option<Vec<GroupFilter>>,
+        limit: RateLimitConfig,
     ) -> Result<Self, Error> {
         let store = DataStore::lookup_datastore(store)?;
 
@@ -66,11 +71,11 @@ impl PullParameters {
             remote_store.to_string(),
         );
 
-        Ok(Self { remote, source, store, owner, remove_vanished, group_filter })
+        Ok(Self { remote, source, store, owner, remove_vanished, group_filter, limit })
     }
 
     pub async fn client(&self) -> Result<HttpClient, Error> {
-        crate::api2::config::remote::remote_client(&self.remote).await
+        crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await
     }
 }