Skip to content

Commit 3120c2f

Browse files
richiwareMiguelCompany
authored andcommitted
Fix GAP messages are not sent when there is no Reader requesting the DATA (#6181)
* Refs #23919. Tests Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix cornercase Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix jobs needs log Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
1 parent 2c78c42 commit 3120c2f

File tree

5 files changed

+257
-21
lines changed

5 files changed

+257
-21
lines changed

include/fastdds/rtps/writer/ReaderProxy.h

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class ReaderProxy
143143
FragmentNumber_t& next_unsent_frag,
144144
SequenceNumber_t& gap_seq,
145145
const SequenceNumber_t& min_seq,
146-
bool& need_reactivate_periodic_heartbeat) const;
146+
bool& need_reactivate_periodic_heartbeat);
147147

148148
/**
149149
* Mark all changes up to the one indicated by seq_num as Acknowledged.
@@ -345,11 +345,38 @@ class ReaderProxy
345345
* Get the highest fully acknowledged sequence number.
346346
* @return the highest fully acknowledged sequence number.
347347
*/
348-
SequenceNumber_t changes_low_mark() const
348+
inline SequenceNumber_t changes_low_mark() const
349349
{
350350
return changes_low_mark_;
351351
}
352352

353+
/*!
354+
* Get the first sequence number not relevant that was removed without reader being informed.
355+
* @return First sequence number.
356+
*/
357+
inline SequenceNumber_t first_irrelevant_removed() const
358+
{
359+
return first_irrelevant_removed_;
360+
}
361+
362+
/*!
363+
* Get the last sequence number not relevant that was removed without reader being informed.
364+
* @return last sequence number.
365+
*/
366+
inline SequenceNumber_t last_irrelevant_removed() const
367+
{
368+
return last_irrelevant_removed_;
369+
}
370+
371+
/*!
372+
* Reset the interval of sequence numbers not relevant that were removed without reader being informed.
373+
*/
374+
inline void reset_irrelevant_removed()
375+
{
376+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
377+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
378+
}
379+
353380
/**
354381
* Change the interval of nack-supression event.
355382
* @param interval Time from data sending to acknack processing.
@@ -450,8 +477,14 @@ class ReaderProxy
450477
//! Last NACKFRAG count.
451478
uint32_t last_nackfrag_count_;
452479

480+
//! Sequence number of the lowest change not fully acknowledged.
453481
SequenceNumber_t changes_low_mark_;
454482

483+
//! First sequence number not relevant that was removed without reader being informed.
484+
SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
485+
//! Last sequence number not relevant that was removed without reader being informed.
486+
SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};
487+
455488
bool active_ = false;
456489

457490
using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;

include/fastdds/rtps/writer/StatefulWriter.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,13 @@ class StatefulWriter : public RTPSWriter
486486
*/
487487
bool ack_timer_expired();
488488

489-
void send_heartbeat_to_all_readers();
489+
/*!
490+
* Send heartbeat to all the remote readers.
491+
* @param force_separating True to send the heartbeat separately for each reader.
492+
* False to send a unique heartbeat to all the readers.
493+
*/
494+
void send_heartbeat_to_all_readers(
495+
bool force_separating);
490496

491497
void deliver_sample_to_intraprocesses(
492498
CacheChange_t* change);
@@ -503,6 +509,10 @@ class StatefulWriter : public RTPSWriter
503509
void prepare_datasharing_delivery(
504510
CacheChange_t* change);
505511

512+
void add_gaps_for_removed_irrelevants(
513+
ReaderProxy& remoteReaderProxy,
514+
RTPSMessageGroup& group);
515+
506516
/**
507517
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
508518
*

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,25 +227,40 @@ void ReaderProxy::add_change(
227227
const ChangeForReader_t& change,
228228
bool is_relevant)
229229
{
230-
assert(change.getSequenceNumber() > changes_low_mark_);
230+
SequenceNumber_t seq_num {change.getSequenceNumber()};
231+
assert(seq_num > changes_low_mark_);
231232
assert(changes_for_reader_.empty() ? true :
232-
change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber());
233+
seq_num > changes_for_reader_.back().getSequenceNumber());
233234

234235
// Irrelevant changes are not added to the collection
235236
if (!is_relevant)
236237
{
237-
if ( !is_reliable_ &&
238-
changes_low_mark_ + 1 == change.getSequenceNumber())
238+
if (is_reliable_)
239+
{
240+
if (!is_local_reader())
241+
{
242+
if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
243+
{
244+
first_irrelevant_removed_ = seq_num;
245+
last_irrelevant_removed_ = seq_num;
246+
}
247+
else if (seq_num == last_irrelevant_removed_ + 1)
248+
{
249+
last_irrelevant_removed_ = seq_num;
250+
}
251+
}
252+
}
253+
else if (changes_low_mark_ + 1 == seq_num)
239254
{
240-
changes_low_mark_ = change.getSequenceNumber();
255+
changes_low_mark_ = seq_num;
241256
}
242257
return;
243258
}
244259

245260
if (changes_for_reader_.push_back(change) == nullptr)
246261
{
247262
// This should never happen
248-
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber()
263+
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num
249264
<< " to reader proxy " << guid());
250265
eprosima::fastdds::dds::Log::Flush();
251266
assert(false);
@@ -281,7 +296,7 @@ bool ReaderProxy::change_is_unsent(
281296
FragmentNumber_t& next_unsent_frag,
282297
SequenceNumber_t& gap_seq,
283298
const SequenceNumber_t& min_seq,
284-
bool& need_reactivate_periodic_heartbeat) const
299+
bool& need_reactivate_periodic_heartbeat)
285300
{
286301
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
287302
{
@@ -328,6 +343,21 @@ bool ReaderProxy::change_is_unsent(
328343
gap_seq = SequenceNumber_t::unknown();
329344
}
330345
}
346+
347+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
348+
SequenceNumber_t::unknown() != gap_seq)
349+
{
350+
// Check if the hole is due to irrelevant changes removed without informing the reader
351+
if (gap_seq == first_irrelevant_removed_)
352+
{
353+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
354+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
355+
}
356+
else if (gap_seq < last_irrelevant_removed_)
357+
{
358+
last_irrelevant_removed_ = gap_seq - 1;
359+
}
360+
}
331361
}
332362
}
333363
}
@@ -436,6 +466,20 @@ bool ReaderProxy::requested_changes_set(
436466
else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
437467
{
438468
gap_builder.add(sit);
469+
470+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
471+
{
472+
// Check if the hole is due to irrelevant changes removed without informing the reader
473+
if (sit == first_irrelevant_removed_)
474+
{
475+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
476+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
477+
}
478+
else if (sit < last_irrelevant_removed_)
479+
{
480+
last_irrelevant_removed_ = sit - 1;
481+
}
482+
}
439483
}
440484
});
441485
}

src/cpp/rtps/writer/StatefulWriter.cpp

Lines changed: 46 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -577,11 +577,12 @@ bool StatefulWriter::change_removed_by_history(
577577
return ret_value;
578578
}
579579

580-
void StatefulWriter::send_heartbeat_to_all_readers()
580+
void StatefulWriter::send_heartbeat_to_all_readers(
581+
bool force_separating)
581582
{
582583
// This method is only called from send_periodic_heartbeat
583584

584-
if (m_separateSendingEnabled)
585+
if (m_separateSendingEnabled || force_separating)
585586
{
586587
for (ReaderProxy* reader : matched_remote_readers_)
587588
{
@@ -705,6 +706,15 @@ DeliveryRetCode StatefulWriter::deliver_sample_to_network(
705706
{
706707
SequenceNumber_t gap_seq;
707708
FragmentNumber_t next_unsent_frag = 0;
709+
710+
if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed())
711+
{
712+
// Send GAP with irrelevant changes that are not in history.
713+
group.sender(this, (*remote_reader)->message_sender());
714+
add_gaps_for_removed_irrelevants(**remote_reader, group);
715+
group.sender(this, &locator_selector); // This makes the flush_and_reset().
716+
}
717+
708718
if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(),
709719
need_reactivate_periodic_heartbeat) &&
710720
(0 == n_fragments || min_unsent_fragment >= next_unsent_frag))
@@ -1730,7 +1740,8 @@ bool StatefulWriter::send_periodic_heartbeat(
17301740
std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
17311741
std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
17321742

1733-
bool unacked_changes = false;
1743+
bool unacked_changes {false};
1744+
bool irrelevants_removed {false};
17341745
if (!liveliness)
17351746
{
17361747
SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
@@ -1739,20 +1750,30 @@ bool StatefulWriter::send_periodic_heartbeat(
17391750
first_seq_to_check_acknowledge = mp_history->next_sequence_number() - 1;
17401751
}
17411752

1742-
unacked_changes = for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1743-
matched_remote_readers_,
1744-
[first_seq_to_check_acknowledge](ReaderProxy* reader)
1745-
{
1746-
return reader->has_unacknowledged(first_seq_to_check_acknowledge);
1747-
}
1748-
);
1753+
for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1754+
matched_remote_readers_,
1755+
[first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader)
1756+
{
1757+
if (!unacked_changes)
1758+
{
1759+
unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge);
1760+
}
1761+
1762+
if (!irrelevants_removed)
1763+
{
1764+
irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed();
1765+
}
1766+
1767+
return unacked_changes && irrelevants_removed;
1768+
}
1769+
);
17491770

17501771
if (unacked_changes)
17511772
{
17521773
try
17531774
{
17541775
//TODO if separating, here sends periodic for all readers, instead of ones needed it.
1755-
send_heartbeat_to_all_readers();
1776+
send_heartbeat_to_all_readers(irrelevants_removed);
17561777
}
17571778
catch (const RTPSMessageGroup::timeout&)
17581779
{
@@ -1842,6 +1863,7 @@ void StatefulWriter::send_heartbeat_to_nts(
18421863
assert(firstSeq <= lastSeq);
18431864
if (!liveliness)
18441865
{
1866+
add_gaps_for_removed_irrelevants(remoteReaderProxy, group);
18451867
add_gaps_for_holes_in_history_(group);
18461868
}
18471869
}
@@ -2293,6 +2315,19 @@ bool StatefulWriter::get_connections(
22932315

22942316
#endif // ifdef FASTDDS_STATISTICS
22952317

2318+
void StatefulWriter::add_gaps_for_removed_irrelevants(
2319+
ReaderProxy& remoteReaderProxy,
2320+
RTPSMessageGroup& group)
2321+
{
2322+
if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed())
2323+
{
2324+
group.add_gap(remoteReaderProxy.first_irrelevant_removed(),
2325+
SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1),
2326+
remoteReaderProxy.guid());
2327+
remoteReaderProxy.reset_irrelevant_removed();
2328+
}
2329+
}
2330+
22962331
void StatefulWriter::add_gaps_for_holes_in_history_(
22972332
RTPSMessageGroup& group)
22982333
{

0 commit comments

Comments
 (0)