1use crate::Error;
59use chrono::{DateTime, NaiveDateTime, Utc};
60use std::collections::HashMap;
61
62const HEADER_DIV: &str = "=====";
64const HEADER_DIV_ALT: &str = "====";
66
67#[derive(Debug, Clone, Default)]
76pub struct RecentStats {
77 pub consensus_count: Option<u32>,
79 pub prioritized_relays: Option<u32>,
81 pub prioritized_relay_lists: Option<u32>,
83 pub measurement_attempts: Option<u32>,
85 pub measurement_failures: Option<u32>,
87 pub relay_failures: RelayFailures,
89}
90
91#[derive(Debug, Clone, Default)]
96pub struct RelayFailures {
97 pub no_measurement: Option<u32>,
99 pub insufficient_period: Option<u32>,
101 pub insufficient_measurements: Option<u32>,
103 pub stale: Option<u32>,
105}
106
107#[derive(Debug, Clone, Default)]
123pub struct BandwidthMeasurement {
124 pub node_id: String,
126 pub bandwidth: u64,
128 pub nick: Option<String>,
130 pub master_key_ed25519: Option<String>,
132 pub measured_at: Option<DateTime<Utc>>,
134 pub updated_at: Option<DateTime<Utc>>,
136 pub bw_mean: Option<u64>,
138 pub bw_median: Option<u64>,
140 pub desc_bw_avg: Option<u64>,
142 pub desc_bw_obs_last: Option<u64>,
144 pub desc_bw_obs_mean: Option<u64>,
146 pub desc_bw_bur: Option<u64>,
148 pub consensus_bandwidth: Option<u64>,
150 pub consensus_bandwidth_is_unmeasured: Option<bool>,
152 pub success: Option<u32>,
154 pub error_circ: Option<u32>,
156 pub error_stream: Option<u32>,
158 pub error_misc: Option<u32>,
160 pub error_destination: Option<u32>,
162 pub error_second_relay: Option<u32>,
164 pub relay_in_recent_consensus_count: Option<u32>,
166 pub relay_recent_measurement_attempt_count: Option<u32>,
168 pub relay_recent_measurements_excluded_error_count: Option<u32>,
170 pub relay_recent_priority_list_count: Option<u32>,
172 pub extra: HashMap<String, String>,
174}
175
176#[derive(Debug, Clone)]
215pub struct BandwidthFile {
216 pub timestamp: DateTime<Utc>,
218 pub version: String,
222 pub header: HashMap<String, String>,
226 pub measurements: HashMap<String, BandwidthMeasurement>,
230 pub software: Option<String>,
232 pub software_version: Option<String>,
234 pub earliest_bandwidth: Option<DateTime<Utc>>,
236 pub latest_bandwidth: Option<DateTime<Utc>>,
238 pub created_at: Option<DateTime<Utc>>,
240 pub generated_at: Option<DateTime<Utc>>,
242 pub consensus_size: Option<u32>,
244 pub eligible_count: Option<u32>,
246 pub eligible_percent: Option<u32>,
248 pub min_count: Option<u32>,
250 pub min_percent: Option<u32>,
252 pub scanner_country: Option<String>,
254 pub destinations_countries: Option<Vec<String>>,
256 pub tor_version: Option<String>,
258 pub time_to_report_half_network: Option<u32>,
262 pub recent_stats: RecentStats,
266 raw_content: Vec<u8>,
267 unrecognized_lines: Vec<String>,
268}
269
270impl BandwidthFile {
271 pub fn parse(content: &str) -> Result<Self, Error> {
314 let raw_content = content.as_bytes().to_vec();
315 let mut lines = content.lines();
316
317 let first_line = lines.next().ok_or_else(|| Error::Parse {
318 location: "bandwidth_file".into(),
319 reason: "Empty file".into(),
320 })?;
321
322 let timestamp = parse_unix_timestamp(first_line)?;
323 let mut header = HashMap::new();
324 let mut version = "1.0.0".to_string();
325 let mut version_index = None;
326 let mut index = 0;
327 let mut body_lines = Vec::new();
328 let mut in_body = false;
329
330 for line in lines {
331 let line = line.trim();
332 if line.is_empty() {
333 continue;
334 }
335
336 if in_body {
337 body_lines.push(line);
338 continue;
339 }
340
341 if line == HEADER_DIV || line == HEADER_DIV_ALT {
342 in_body = true;
343 continue;
344 }
345
346 if header.is_empty() && line.contains("node_id=") {
347 body_lines.push(line);
348 in_body = true;
349 continue;
350 }
351
352 if let Some((key, value)) = line.split_once('=') {
353 header.insert(key.to_string(), value.to_string());
354 if key == "version" {
355 version = value.to_string();
356 version_index = Some(index);
357 }
358 index += 1;
359 }
360 }
361
362 if let Some(vi) = version_index {
363 if vi != 0 {
364 return Err(Error::Parse {
365 location: "bandwidth_file".into(),
366 reason: "The 'version' header must be in the second position".into(),
367 });
368 }
369 }
370
371 let software = header.get("software").cloned();
372 let software_version = header.get("software_version").cloned();
373 let earliest_bandwidth = header
374 .get("earliest_bandwidth")
375 .and_then(|s| parse_iso_date(s));
376 let latest_bandwidth = header
377 .get("latest_bandwidth")
378 .and_then(|s| parse_iso_date(s));
379 let created_at = header.get("file_created").and_then(|s| parse_iso_date(s));
380 let generated_at = header
381 .get("generator_started")
382 .and_then(|s| parse_iso_date(s));
383 let consensus_size = header
384 .get("number_consensus_relays")
385 .and_then(|s| s.parse().ok());
386 let eligible_count = header
387 .get("number_eligible_relays")
388 .and_then(|s| s.parse().ok());
389 let eligible_percent = header
390 .get("percent_eligible_relays")
391 .and_then(|s| s.parse().ok());
392 let min_count = header
393 .get("minimum_number_eligible_relays")
394 .and_then(|s| s.parse().ok());
395 let min_percent = header
396 .get("minimum_percent_eligible_relays")
397 .and_then(|s| s.parse().ok());
398 let scanner_country = header.get("scanner_country").cloned();
399 let destinations_countries = header
400 .get("destinations_countries")
401 .map(|s| s.split(',').map(|c| c.trim().to_string()).collect());
402 let tor_version = header.get("tor_version").cloned();
403 let time_to_report_half_network = header
404 .get("time_to_report_half_network")
405 .and_then(|s| s.parse().ok());
406
407 let recent_stats = RecentStats {
408 consensus_count: header
409 .get("recent_consensus_count")
410 .and_then(|s| s.parse().ok()),
411 prioritized_relay_lists: header
412 .get("recent_priority_list_count")
413 .and_then(|s| s.parse().ok()),
414 prioritized_relays: header
415 .get("recent_priority_relay_count")
416 .and_then(|s| s.parse().ok()),
417 measurement_attempts: header
418 .get("recent_measurement_attempt_count")
419 .and_then(|s| s.parse().ok()),
420 measurement_failures: header
421 .get("recent_measurement_failure_count")
422 .and_then(|s| s.parse().ok()),
423 relay_failures: RelayFailures {
424 no_measurement: header
425 .get("recent_measurements_excluded_error_count")
426 .and_then(|s| s.parse().ok()),
427 insufficient_period: header
428 .get("recent_measurements_excluded_near_count")
429 .and_then(|s| s.parse().ok()),
430 insufficient_measurements: header
431 .get("recent_measurements_excluded_few_count")
432 .and_then(|s| s.parse().ok()),
433 stale: header
434 .get("recent_measurements_excluded_old_count")
435 .and_then(|s| s.parse().ok()),
436 },
437 };
438
439 let mut measurements = HashMap::new();
440 let mut unrecognized_lines = Vec::new();
441
442 for line in body_lines {
443 match parse_measurement_line(line) {
444 Ok(measurement) => {
445 if measurements.contains_key(&measurement.node_id) {
446 return Err(Error::Parse {
447 location: "bandwidth_file".into(),
448 reason: format!(
449 "Relay {} is listed multiple times",
450 measurement.node_id
451 ),
452 });
453 }
454 measurements.insert(measurement.node_id.clone(), measurement);
455 }
456 Err(_) => {
457 unrecognized_lines.push(line.to_string());
458 }
459 }
460 }
461
462 Ok(Self {
463 timestamp,
464 version,
465 header,
466 measurements,
467 software,
468 software_version,
469 earliest_bandwidth,
470 latest_bandwidth,
471 created_at,
472 generated_at,
473 consensus_size,
474 eligible_count,
475 eligible_percent,
476 min_count,
477 min_percent,
478 scanner_country,
479 destinations_countries,
480 tor_version,
481 time_to_report_half_network,
482 recent_stats,
483 raw_content,
484 unrecognized_lines,
485 })
486 }
487
488 pub fn raw_content(&self) -> &[u8] {
493 &self.raw_content
494 }
495
496 pub fn unrecognized_lines(&self) -> &[String] {
502 &self.unrecognized_lines
503 }
504
505 pub fn to_descriptor_string(&self) -> String {
516 let mut lines = Vec::new();
517 lines.push(self.timestamp.timestamp().to_string());
518
519 if self.version != "1.0.0" {
520 lines.push(format!("version={}", self.version));
521 for (key, value) in &self.header {
522 if key != "version" {
523 lines.push(format!("{}={}", key, value));
524 }
525 }
526 lines.push(HEADER_DIV.to_string());
527 }
528
529 for measurement in self.measurements.values() {
530 lines.push(measurement_to_string(measurement));
531 }
532
533 lines.join("\n")
534 }
535}
536
537fn parse_unix_timestamp(s: &str) -> Result<DateTime<Utc>, Error> {
538 let ts: i64 = s.trim().parse().map_err(|_| Error::Parse {
539 location: "bandwidth_file".into(),
540 reason: format!("First line should be a unix timestamp, but was '{}'", s),
541 })?;
542 DateTime::from_timestamp(ts, 0).ok_or_else(|| Error::Parse {
543 location: "bandwidth_file".into(),
544 reason: format!("Invalid unix timestamp: {}", ts),
545 })
546}
547
548fn parse_iso_date(s: &str) -> Option<DateTime<Utc>> {
549 NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S")
550 .ok()
551 .map(|dt| dt.and_utc())
552}
553
554fn parse_measurement_line(line: &str) -> Result<BandwidthMeasurement, Error> {
555 let mut measurement = BandwidthMeasurement::default();
556 let mut has_node_id = false;
557
558 for part in line.split_whitespace() {
559 if let Some((key, value)) = part.split_once('=') {
560 match key {
561 "node_id" => {
562 measurement.node_id = value.trim_start_matches('$').to_string();
563 has_node_id = true;
564 }
565 "bw" => measurement.bandwidth = value.parse().unwrap_or(0),
566 "nick" => measurement.nick = Some(value.to_string()),
567 "master_key_ed25519" => measurement.master_key_ed25519 = Some(value.to_string()),
568 "measured_at" => {
569 measurement.measured_at = value
570 .parse::<i64>()
571 .ok()
572 .and_then(|ts| DateTime::from_timestamp(ts, 0))
573 }
574 "updated_at" => {
575 measurement.updated_at = value
576 .parse::<i64>()
577 .ok()
578 .and_then(|ts| DateTime::from_timestamp(ts, 0))
579 }
580 "time" => measurement.measured_at = parse_iso_date(value),
581 "bw_mean" => measurement.bw_mean = value.parse().ok(),
582 "bw_median" => measurement.bw_median = value.parse().ok(),
583 "desc_bw_avg" => measurement.desc_bw_avg = value.parse().ok(),
584 "desc_bw_obs_last" => measurement.desc_bw_obs_last = value.parse().ok(),
585 "desc_bw_obs_mean" => measurement.desc_bw_obs_mean = value.parse().ok(),
586 "desc_bw_bur" => measurement.desc_bw_bur = value.parse().ok(),
587 "consensus_bandwidth" => measurement.consensus_bandwidth = value.parse().ok(),
588 "consensus_bandwidth_is_unmeasured" => {
589 measurement.consensus_bandwidth_is_unmeasured = Some(value == "True")
590 }
591 "success" => measurement.success = value.parse().ok(),
592 "error_circ" => measurement.error_circ = value.parse().ok(),
593 "error_stream" => measurement.error_stream = value.parse().ok(),
594 "error_misc" => measurement.error_misc = value.parse().ok(),
595 "error_destination" => measurement.error_destination = value.parse().ok(),
596 "error_second_relay" => measurement.error_second_relay = value.parse().ok(),
597 "relay_in_recent_consensus_count" => {
598 measurement.relay_in_recent_consensus_count = value.parse().ok()
599 }
600 "relay_recent_measurement_attempt_count" => {
601 measurement.relay_recent_measurement_attempt_count = value.parse().ok()
602 }
603 "relay_recent_measurements_excluded_error_count" => {
604 measurement.relay_recent_measurements_excluded_error_count = value.parse().ok()
605 }
606 "relay_recent_priority_list_count" => {
607 measurement.relay_recent_priority_list_count = value.parse().ok()
608 }
609 _ => {
610 measurement.extra.insert(key.to_string(), value.to_string());
611 }
612 }
613 }
614 }
615
616 if !has_node_id {
617 return Err(Error::Parse {
618 location: "bandwidth_file".into(),
619 reason: "Every measurement must include 'node_id'".into(),
620 });
621 }
622
623 Ok(measurement)
624}
625
626fn measurement_to_string(m: &BandwidthMeasurement) -> String {
627 let mut parts = Vec::new();
628 parts.push(format!("bw={}", m.bandwidth));
629 if let Some(ref nick) = m.nick {
630 parts.push(format!("nick={}", nick));
631 }
632 parts.push(format!("node_id=${}", m.node_id));
633 if let Some(ref key) = m.master_key_ed25519 {
634 parts.push(format!("master_key_ed25519={}", key));
635 }
636 if let Some(bw_mean) = m.bw_mean {
637 parts.push(format!("bw_mean={}", bw_mean));
638 }
639 if let Some(bw_median) = m.bw_median {
640 parts.push(format!("bw_median={}", bw_median));
641 }
642 if let Some(success) = m.success {
643 parts.push(format!("success={}", success));
644 }
645 if let Some(dt) = m.measured_at {
646 parts.push(format!("time={}", dt.format("%Y-%m-%dT%H:%M:%S")));
647 }
648 for (key, value) in &m.extra {
649 parts.push(format!("{}={}", key, value));
650 }
651 parts.join(" ")
652}
653
654#[cfg(test)]
655mod tests {
656 use super::*;
657
658 #[test]
659 fn test_parse_v1_0() {
660 let content = r#"1547487689
661node_id=$221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476 bw=38000 nick=digitalocean1 measured_at=1546325250
662node_id=$1F509589F7F70B69A38719A201451CF4B70F89C6 bw=589 nick=CulNoir measured_at=1547441722"#;
663
664 let bw = BandwidthFile::parse(content).unwrap();
665 assert_eq!(bw.version, "1.0.0");
666 assert_eq!(bw.measurements.len(), 2);
667 assert!(bw
668 .measurements
669 .contains_key("221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476"));
670 let m = &bw.measurements["221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476"];
671 assert_eq!(m.bandwidth, 38000);
672 assert_eq!(m.nick, Some("digitalocean1".to_string()));
673 }
674
675 #[test]
676 fn test_parse_v1_2() {
677 let content = r#"1547444099
678version=1.2.0
679earliest_bandwidth=2019-01-04T05:35:29
680file_created=2019-01-14T05:35:06
681software=sbws
682software_version=1.0.2
683=====
684bw=1 bw_mean=191643 nick=mrkoolltor node_id=$92808CA58D8F32CA34A34C547610869BF4E2A6EC success=10"#;
685
686 let bw = BandwidthFile::parse(content).unwrap();
687 assert_eq!(bw.version, "1.2.0");
688 assert_eq!(bw.software, Some("sbws".to_string()));
689 assert_eq!(bw.software_version, Some("1.0.2".to_string()));
690 assert!(bw.earliest_bandwidth.is_some());
691 assert!(bw.created_at.is_some());
692 assert_eq!(bw.measurements.len(), 1);
693 }
694
695 #[test]
696 fn test_parse_v1_4() {
697 let content = r#"1555882497
698version=1.4.0
699scanner_country=US
700software=sbws
701software_version=1.1.0
702recent_consensus_count=34
703recent_measurement_attempt_count=86417
704time_to_report_half_network=223519
705tor_version=0.3.5.10
706=====
707bw=1 bw_mean=21403 consensus_bandwidth=1000 nick=t7 node_id=$F63DF6AA4F395AD2F5F363333D104279F2171381"#;
708
709 let bw = BandwidthFile::parse(content).unwrap();
710 assert_eq!(bw.version, "1.4.0");
711 assert_eq!(bw.scanner_country, Some("US".to_string()));
712 assert_eq!(bw.tor_version, Some("0.3.5.10".to_string()));
713 assert_eq!(bw.time_to_report_half_network, Some(223519));
714 assert_eq!(bw.recent_stats.consensus_count, Some(34));
715 assert_eq!(bw.recent_stats.measurement_attempts, Some(86417));
716 }
717
718 #[test]
719 fn test_duplicate_relay_error() {
720 let content = r#"1547487689
721node_id=$221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476 bw=38000
722node_id=$221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476 bw=39000"#;
723
724 let result = BandwidthFile::parse(content);
725 assert!(result.is_err());
726 }
727
728 #[test]
729 fn test_missing_node_id_in_body() {
730 let content = r#"1547487689
731node_id=$ABC bw=100
732bw=38000 nick=test"#;
733
734 let result = BandwidthFile::parse(content);
735 assert!(result.is_ok());
736 let bw = result.unwrap();
737 assert_eq!(bw.measurements.len(), 1);
738 assert_eq!(bw.unrecognized_lines.len(), 1);
739 }
740
741 #[test]
742 fn test_invalid_timestamp_error() {
743 let content = "not_a_timestamp\nnode_id=$ABC bw=100";
744 let result = BandwidthFile::parse(content);
745 assert!(result.is_err());
746 }
747
748 #[test]
749 fn test_header_alternate_div() {
750 let content = r#"1547444099
751version=1.2.0
752software=sbws
753====
754bw=1 nick=test node_id=$92808CA58D8F32CA34A34C547610869BF4E2A6EC"#;
755
756 let bw = BandwidthFile::parse(content).unwrap();
757 assert_eq!(bw.version, "1.2.0");
758 assert_eq!(bw.software, Some("sbws".to_string()));
759 assert_eq!(bw.measurements.len(), 1);
760 }
761
762 #[test]
763 fn test_minimal_bandwidth_file() {
764 let content = "1410723598";
765 let bw = BandwidthFile::parse(content).unwrap();
766 assert_eq!(bw.version, "1.0.0");
767 assert!(bw.software.is_none());
768 assert!(bw.software_version.is_none());
769 assert!(bw.earliest_bandwidth.is_none());
770 assert!(bw.latest_bandwidth.is_none());
771 assert!(bw.created_at.is_none());
772 assert!(bw.generated_at.is_none());
773 assert!(bw.consensus_size.is_none());
774 assert!(bw.eligible_count.is_none());
775 assert!(bw.measurements.is_empty());
776 }
777
778 #[test]
779 fn test_invalid_timestamp_variations() {
780 let test_values = ["", "boo", "123.4"];
781 for value in test_values {
782 let result = BandwidthFile::parse(value);
783 assert!(result.is_err(), "Expected error for timestamp: {}", value);
784 }
785 }
786
787 #[test]
788 fn test_measurement_all_fields() {
789 let content = r#"1547487689
790node_id=$221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476 bw=38000 nick=digitalocean1 master_key_ed25519=abc123 bw_mean=40000 bw_median=39000 success=10 error_circ=1 error_stream=2"#;
791
792 let bw = BandwidthFile::parse(content).unwrap();
793 let m = &bw.measurements["221C91D4C51E4C73CB6A8F0BEE01B0A6BB4A8476"];
794 assert_eq!(m.bandwidth, 38000);
795 assert_eq!(m.nick, Some("digitalocean1".to_string()));
796 assert_eq!(m.master_key_ed25519, Some("abc123".to_string()));
797 assert_eq!(m.bw_mean, Some(40000));
798 assert_eq!(m.bw_median, Some(39000));
799 assert_eq!(m.success, Some(10));
800 assert_eq!(m.error_circ, Some(1));
801 assert_eq!(m.error_stream, Some(2));
802 }
803
804 #[test]
805 fn test_v1_4_specific_fields() {
806 let content = r#"1555882497
807version=1.4.0
808scanner_country=US
809destinations_countries=ZZ,US,DE
810time_to_report_half_network=223519
811recent_consensus_count=34
812recent_priority_list_count=260
813recent_priority_relay_count=86417
814recent_measurement_attempt_count=86417
815recent_measurement_failure_count=57023
816=====
817bw=1 node_id=$F63DF6AA4F395AD2F5F363333D104279F2171381"#;
818
819 let bw = BandwidthFile::parse(content).unwrap();
820 assert_eq!(bw.scanner_country, Some("US".to_string()));
821 assert_eq!(
822 bw.destinations_countries,
823 Some(vec!["ZZ".to_string(), "US".to_string(), "DE".to_string()])
824 );
825 assert_eq!(bw.time_to_report_half_network, Some(223519));
826 assert_eq!(bw.recent_stats.consensus_count, Some(34));
827 assert_eq!(bw.recent_stats.prioritized_relay_lists, Some(260));
828 assert_eq!(bw.recent_stats.prioritized_relays, Some(86417));
829 assert_eq!(bw.recent_stats.measurement_attempts, Some(86417));
830 assert_eq!(bw.recent_stats.measurement_failures, Some(57023));
831 }
832}