Skip to content

Commit 9d0b1ab

Browse files
Merge branch 'RetroShare:master' into wire-notifyv2
2 parents da1f2bf + 81ec8e1 commit 9d0b1ab

7 files changed

Lines changed: 116 additions & 21 deletions

File tree

src/pqi/pqi_base.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,18 @@ class RsBwRates
5252
{
5353
public:
5454
RsBwRates()
55-
:mRateIn(0), mRateOut(0), mMaxRateIn(0), mMaxRateOut(0), mQueueIn(0), mQueueOut(0) {return;}
55+
: mRateIn(0), mRateOut(0), mMaxRateIn(0), mMaxRateOut(0),
56+
mQueueIn(0), mQueueOut(0), mQueueOutBytes(0),
57+
mTotalIn(0), mTotalOut(0) { return; }
5658
float mRateIn;
5759
float mRateOut;
5860
float mMaxRateIn;
5961
float mMaxRateOut;
6062
int mQueueIn;
6163
int mQueueOut;
64+
uint32_t mQueueOutBytes;
65+
uint64_t mTotalIn;
66+
uint64_t mTotalOut;
6267
};
6368

6469

@@ -239,6 +244,8 @@ class PQInterface: public RateInterface
239244
uint64_t traf_in;
240245
uint64_t traf_out;
241246

247+
virtual float getQueueSize_seconds() { return 0.0f; }
248+
242249
private:
243250

244251
RsPeerId peerId;

src/pqi/pqihandler.cc

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -287,40 +287,59 @@ int pqihandler::ExtractTrafficInfo(std::list<RSTrafficClue>& out_lst,std::li
287287
}
288288

289289
// NEW extern fn to extract rates.
290-
int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &total)
290+
291+
int pqihandler::ExtractRates(std::map<RsPeerId, RsBwRates> &ratemap, RsBwRates &total)
291292
{
293+
/* Initialize standard totals */
292294
total.mMaxRateIn = getMaxRate(true);
293295
total.mMaxRateOut = getMaxRate(false);
294296
total.mRateIn = 0;
295297
total.mRateOut = 0;
296298
total.mQueueIn = 0;
297299
total.mQueueOut = 0;
300+
total.mQueueOutBytes = 0;
301+
total.mTotalIn = 0;
302+
total.mTotalOut = 0;
298303

299304
/* Lock once rates have been retrieved */
300305
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
301306

302307
std::map<RsPeerId, SearchModule *>::iterator it;
308+
303309
for(it = mods.begin(); it != mods.end(); ++it)
304310
{
305311
SearchModule *mod = (it -> second);
306312

307313
RsBwRates peerRates;
314+
315+
/* Call the relay in pqiperson (which calls pqistreamer) */
308316
mod -> pqi -> getRates(peerRates);
309317

318+
/* Accumulate standard statistics */
310319
total.mRateIn += peerRates.mRateIn;
311320
total.mRateOut += peerRates.mRateOut;
312321
total.mQueueIn += peerRates.mQueueIn;
313322
total.mQueueOut += peerRates.mQueueOut;
323+
total.mQueueOutBytes += peerRates.mQueueOutBytes;
314324

325+
/* Accumulate cumulative statistics into global totals */
326+
total.mTotalIn += peerRates.mTotalIn;
327+
total.mTotalOut += peerRates.mTotalOut;
328+
329+
/* Store individual peer rates in the result map */
315330
ratemap[it->first] = peerRates;
316331

332+
/* Clean debug message for this layer */
333+
//RsDbg() << "OUTQUEUEBYTES [pqihandler] Collected Peer: " << it->first << " | Bytes: " << peerRates.mQueueOutBytes;
334+
335+
/* Debug message for extraction layer */
336+
//RsDbg() << "BWSUM Handler [ExtractRates] Peer: " << it->first << " | In: " << peerRates.mTotalIn << " | Out: " << peerRates.mTotalOut;
337+
317338
}
318339

319340
return 1;
320341
}
321342

322-
323-
324343
// internal fn to send updates
325344
int pqihandler::UpdateRates()
326345
{
@@ -464,12 +483,16 @@ int pqihandler::UpdateRates()
464483
{
465484
SearchModule *mod = (it -> second);
466485

467-
// for our down bandwidth we use the calculated value without taking into account the max up provided by peers via BwCtrl
468-
// this is harmless as they will control their up bw on their side
486+
// attn: the bwctrl service only passes to the peers a max in value, therefore we are not aware of our peers max out
487+
// see p3bwctrl and rsconfig.h
488+
489+
// for our down bandwidth we use the calculated value as is, because BwCtrl does not provide a max out from our peers
490+
// this down limit will be passed to our peers via BwCtrl
469491
mod -> pqi -> setMaxRate(true, in_max_bw);
470492

471-
// for our up bandwidth we take into account the max down provided by peers via BwCtrl
472-
// because we don't want to clog our outqueues, the TCP buffers, and the peers inbound queues
493+
// for our up bandwidth we take into account the max in provided by peers via BwCtrl
494+
// we don't want to clog our outqueues, the TCP buffers, and the peers inbound queues
495+
// attn: this up limit will not passed to our peers via BwCtrl
473496
mod -> pqi -> setMaxRate(false, out_max_bw);
474497
if ((rateMap_it = rateMap.find(mod->pqi->PeerId())) != rateMap.end())
475498
if (rateMap_it->second.mAllowedOut > 0)

src/pqi/pqiperson.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,16 +559,24 @@ int pqiperson::connect(uint32_t type, const sockaddr_storage &raddr,
559559
return 1;
560560
}
561561

562-
563562
void pqiperson::getRates(RsBwRates &rates)
564563
{
565564
RS_STACK_MUTEX(mPersonMtx);
566565

567-
// get the rate from the active one.
566+
/* Check if the peer connection is established and active */
568567
if ((!active) || (activepqi == NULL))
568+
{
569569
return;
570+
}
570571

572+
/* Forward the request to the active streamer (pqiconnect) */
571573
activepqi->getRates(rates);
574+
575+
/* Clean debug message for the relay layer */
576+
//RsDbg() << "OUTQUEUEBYTES [Person] Peer: " << PeerId() << " | Relaying rate request";
577+
578+
/* Debug to confirm the relay layer receives cumulative totals */
579+
//RsDbg() << "BWSUM Relay [Person] Peer: " << PeerId() << " | In: " << rates.mTotalIn << " | Out: " << rates.mTotalOut;
572580
}
573581

574582
int pqiperson::gatherStatistics(std::list<RSTrafficClue>& out_lst,

src/pqi/pqistreamer.cc

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,16 +1372,33 @@ int pqistreamer::getQueueSize_bytes(bool in)
13721372
}
13731373
}
13741374

1375-
void pqistreamer::getRates(RsBwRates &rates)
1375+
void pqistreamer::getRates(RsBwRates &rates)
13761376
{
1377+
/* Call base RateInterface to get basic bandwidth numbers */
13771378
RateInterface::getRates(rates);
13781379

1379-
// no mutex is needed here because this is atomic
1380+
// No mutex is needed here for mIncomingSize as it is atomic
13801381
rates.mQueueIn = mIncomingSize;
13811382

13821383
{
13831384
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
1385+
1386+
/* Fetch standard item count */
13841387
rates.mQueueOut = locked_out_queue_size();
1388+
1389+
/* Extract the actual queue size in bytes */
1390+
/* locked_compute_out_pkt_size() returns the sum of serialized sizes of all items in queue */
1391+
rates.mQueueOutBytes = (uint32_t)locked_compute_out_pkt_size();
1392+
1393+
/* Populate cumulative totals from internal variables */
1394+
rates.mTotalIn = (uint64_t)mTotalRead;
1395+
rates.mTotalOut = (uint64_t)mTotalSent;
1396+
1397+
/* Debug message */
1398+
//RsDbg() << "BWSUM Source [Streamer] Peer: " << PeerId() << " | In: " << rates.mTotalIn << " | Out: " << rates.mTotalOut;
1399+
1400+
/* Debus message */
1401+
//RsDbg() << "OUTQUEUEBYTES [Streamer] Peer: " << PeerId() << " | Bytes: " << rates.mQueueOutBytes;
13851402
}
13861403
}
13871404

src/pqi/pqistreamer.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ class pqistreamer: public PQInterface
8080
virtual float getMaxRate(bool b) ;
8181
virtual float getMaxRate_locked(bool b);
8282

83+
/* Public getters for cumulative traffic */
84+
uint64_t getTotalRead() const { return (uint64_t)mTotalRead; }
85+
uint64_t getTotalSent() const { return (uint64_t)mTotalSent; }
86+
8387
protected:
8488
virtual int reset() ;
8589

@@ -152,8 +156,8 @@ class pqistreamer: public PQInterface
152156
uint32_t mIncomingSize_bytes; // size of Incoming in btyes
153157

154158
// data for network stats.
155-
int mTotalRead;
156-
int mTotalSent;
159+
uint64_t mTotalRead;
160+
uint64_t mTotalSent;
157161

158162
// these are representative (but not exact)
159163
int mCurrRead;

src/retroshare/rsconfig.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ struct RsConfigDataRates : RsSerializable
149149
mAllocTs(0),
150150
mRateOut(0), mRateMaxOut(0), mAllowedOut(0),
151151
mAllowedTs(0),
152-
mQueueIn(0), mQueueOut(0)
152+
mQueueIn(0), mQueueOut(0),
153+
mQueueOutBytes(0),
154+
mTotalIn(0), mTotalOut(0)
153155
{}
154156

155157
/* all in kB/s */
@@ -167,6 +169,10 @@ struct RsConfigDataRates : RsSerializable
167169

168170
int mQueueIn;
169171
int mQueueOut;
172+
uint32_t mQueueOutBytes;
173+
174+
uint64_t mTotalIn; // Total bytes received (cumulative)
175+
uint64_t mTotalOut; // Total bytes sent (cumulative)
170176

171177
// RsSerializable interface
172178
void serial_process(RsGenericSerializer::SerializeJob j, RsGenericSerializer::SerializeContext &ctx) {
@@ -184,6 +190,10 @@ struct RsConfigDataRates : RsSerializable
184190

185191
RS_SERIAL_PROCESS(mQueueIn);
186192
RS_SERIAL_PROCESS(mQueueOut);
193+
RS_SERIAL_PROCESS(mQueueOutBytes);
194+
195+
RS_SERIAL_PROCESS(mTotalIn);
196+
RS_SERIAL_PROCESS(mTotalOut);
187197
}
188198
};
189199

src/services/p3bwctrl.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ bool p3BandwidthControl::checkAvailableBandwidth()
110110
std::map<RsPeerId, RsBwRates> rateMap;
111111
RsBwRates total;
112112

113+
/* VERIFICATION DEBUG */
114+
//RsDbg() << "OUTQUEUEBYTES [p3bwctrl] checkAvailableBandwidth: triggering ExtractRates";
115+
113116
mPg->ExtractRates(rateMap, total);
114117
std::map<RsPeerId, RsBwRates>::iterator it;
115118
std::map<RsPeerId, BwCtrlData>::iterator bit;
@@ -162,6 +165,12 @@ bool p3BandwidthControl::checkAvailableBandwidth()
162165
bit->second.mRates = it->second;
163166
bit->second.mRateUpdateTs = now;
164167

168+
/* Debug message */
169+
//RsDbg() << "OUTQUEUEBYTES [p3bwctrl] Bridge active for Peer: " << bit->first << " | Bytes: " << bit->second.mRates.mQueueOutBytes;
170+
171+
/* Debug to confirm data reached the Bandwidth Control service */
172+
//RsDbg() << "BWSUM Service [checkAvailableBandwidth] Peer: " << bit->first << " | In: " << bit->second.mRates.mTotalIn << " | Out: " << bit->second.mRates.mTotalOut;
173+
165174
if (updatePeer)
166175
{
167176
#define ALLOC_FACTOR (1.0)
@@ -225,6 +234,7 @@ bool p3BandwidthControl::processIncoming()
225234
return true;
226235
}
227236

237+
228238
int p3BandwidthControl::getTotalBandwidthRates(RsConfigDataRates &rates)
229239
{
230240
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
@@ -242,10 +252,22 @@ int p3BandwidthControl::getTotalBandwidthRates(RsConfigDataRates &rates)
242252

243253
rates.mQueueIn = mTotalRates.mQueueIn;
244254
rates.mQueueOut = mTotalRates.mQueueOut;
255+
rates.mQueueOutBytes = mTotalRates.mQueueOutBytes;
256+
257+
/* Copy global cumulative totals to the output structure */
258+
rates.mTotalIn = mTotalRates.mTotalIn;
259+
rates.mTotalOut = mTotalRates.mTotalOut;
260+
261+
/* DEBUG OPTIONNEL pour vérifier le total global */
262+
//RsDbg() << "OUTQUEUEBYTES [p3bwctrl] GLOBAL TOTAL Bridge: " << rates.mQueueOutBytes;
263+
264+
/* Debug message for global totals */
265+
//RsDbg() << "BWSUM Final API [Global Total] | In: " << rates.mTotalIn << " | Out: " << rates.mTotalOut;
245266

246267
return 1;
247268
}
248269

270+
249271
int p3BandwidthControl::getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRates> &ratemap)
250272
{
251273
RsStackMutex stack(mBwMtx); /****** LOCKED MUTEX *******/
@@ -268,10 +290,19 @@ int p3BandwidthControl::getAllBandwidthRates(std::map<RsPeerId, RsConfigDataRate
268290

269291
rates.mQueueIn = bit->second.mRates.mQueueIn;
270292
rates.mQueueOut = bit->second.mRates.mQueueOut;
293+
rates.mQueueOutBytes = bit->second.mRates.mQueueOutBytes;
294+
295+
/* Copy individual cumulative totals to the API structure */
296+
rates.mTotalIn = bit->second.mRates.mTotalIn;
297+
rates.mTotalOut = bit->second.mRates.mTotalOut;
298+
299+
/* Debug message */
300+
//RsDbg() << "OUTQUEUEBYTES [p3bwctrl] Final API Exit for Peer: " << bit->first << " | Bytes: " << rates.mQueueOutBytes;
301+
//RsDbg() << "BWSUM Final API [Peer] " << bit->first << " | In: " << rates.mTotalIn << " | Out: " << rates.mTotalOut;
271302

272303
ratemap[bit->first] = rates;
273304
}
274-
return true ;
305+
return 1 ;
275306

276307

277308
}
@@ -281,11 +312,6 @@ int p3BandwidthControl::ExtractTrafficInfo(std::list<RSTrafficClue>& out_sta
281312
return mPg->ExtractTrafficInfo(out_stats,in_stats) ;
282313
}
283314

284-
285-
286-
287-
288-
289315
int p3BandwidthControl::printRateInfo_locked(std::ostream &out)
290316
{
291317
out << "p3BandwidthControl::printRateInfo_locked()";

0 commit comments

Comments
 (0)