Skip to content

Commit 154951b

Browse files
richiwaremergify[bot]
authored andcommitted
Fix GAP messages are not sent when there is no Reader requesting the DATA (#6181)
* Refs #23919. Tests Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix cornercase Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Fix jobs needs log Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> * Refs #23919. Apply suggestions Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> --------- Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev> (cherry picked from commit 062258c) # Conflicts: # examples/cpp/content_filter/SubscriberApp.hpp # src/cpp/rtps/writer/StatefulWriter.cpp # test/blackbox/common/DDSBlackboxTestsContentFilter.cpp # test/dds/xtypes/TypeLookupServiceSubscriber.cpp
1 parent c047a31 commit 154951b

File tree

8 files changed

+993
-20
lines changed

8 files changed

+993
-20
lines changed

.github/workflows/reusable-sanitizers-ci.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ jobs:
221221
continue-on-error: true
222222
uses: eProsima/eProsima-CI/multiplatform/colcon_test@v0
223223
with:
224+
colcon_args_default: ''
224225
colcon_test_args: ${{ inputs.colcon_test_args }}
225226
colcon_test_args_default: '--event-handlers=console_direct+ --return-code-on-test-failure'
226227
ctest_args: ${{ inputs.ctest_args }}
@@ -319,6 +320,7 @@ jobs:
319320
continue-on-error: true
320321
uses: eProsima/eProsima-CI/multiplatform/colcon_test@v0
321322
with:
323+
colcon_args_default: ''
322324
colcon_test_args: ${{ inputs.colcon_test_args }}
323325
colcon_test_args_default: '--event-handlers=console_direct+ --return-code-on-test-failure'
324326
ctest_args: ${{ inputs.ctest_args }}
@@ -464,6 +466,7 @@ jobs:
464466
continue-on-error: true
465467
uses: eProsima/eProsima-CI/multiplatform/colcon_test@v0
466468
with:
469+
colcon_args_default: ''
467470
colcon_test_args: ${{ inputs.colcon_test_args }}
468471
colcon_test_args_default: '--event-handlers=console_direct+ --return-code-on-test-failure'
469472
ctest_args: ${{ inputs.ctest_args }}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file SubscriberApp.hpp
17+
*
18+
*/
19+
20+
#ifndef FASTDDS_EXAMPLES_CPP_CONTENT_FILTER__SUBSCRIBERAPP_HPP
21+
#define FASTDDS_EXAMPLES_CPP_CONTENT_FILTER__SUBSCRIBERAPP_HPP
22+
23+
#include <atomic>
24+
#include <condition_variable>
25+
26+
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
27+
#include <fastdds/dds/domain/DomainParticipant.hpp>
28+
#include <fastdds/dds/subscriber/DataReader.hpp>
29+
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
30+
#include <fastdds/dds/subscriber/SampleInfo.hpp>
31+
#include <fastdds/dds/subscriber/Subscriber.hpp>
32+
#include <fastdds/dds/topic/ContentFilteredTopic.hpp>
33+
#include <fastdds/dds/topic/Topic.hpp>
34+
#include <fastdds/dds/topic/TypeSupport.hpp>
35+
36+
#include "Application.hpp"
37+
#include "CLIParser.hpp"
38+
#include "CustomContentFilterFactory.hpp"
39+
#include "HelloWorld.hpp"
40+
41+
using namespace eprosima::fastdds::dds;
42+
namespace eprosima {
43+
namespace fastdds {
44+
namespace examples {
45+
namespace content_filter {
46+
class SubscriberApp : public Application, public DataReaderListener
47+
{
48+
public:
49+
50+
//! Constructor
51+
SubscriberApp(
52+
const CLIParser::subscriber_config& config,
53+
const std::string& topic_name);
54+
55+
//! Destructor
56+
~SubscriberApp();
57+
58+
//! Subscription callback
59+
void on_data_available(
60+
DataReader* reader) override;
61+
62+
//! Subscriber matched method
63+
void on_subscription_matched(
64+
DataReader* reader,
65+
const SubscriptionMatchedStatus& info) override;
66+
67+
void on_sample_lost(
68+
DataReader*,
69+
const SampleLostStatus& status) override
70+
{
71+
std::cout << "Sample lost: " << status.total_count << std::endl;
72+
}
73+
74+
//! Run subscriber
75+
void run() override;
76+
77+
//! Trigger the end of execution
78+
void stop() override;
79+
80+
private:
81+
82+
//! Return the current state of execution
83+
bool is_stopped();
84+
85+
HelloWorld hello_;
86+
87+
DomainParticipant* participant_;
88+
89+
Subscriber* subscriber_;
90+
91+
Topic* topic_;
92+
93+
DataReader* reader_;
94+
95+
TypeSupport type_;
96+
97+
uint16_t received_samples_;
98+
99+
uint16_t samples_;
100+
101+
//! DDS ContentFilteredTopic pointer
102+
ContentFilteredTopic* filter_topic_;
103+
104+
//! Custom filter factory
105+
CustomContentFilterFactory filter_factory;
106+
107+
std::atomic<bool> stop_;
108+
109+
mutable std::mutex terminate_cv_mtx_;
110+
111+
std::condition_variable terminate_cv_;
112+
};
113+
114+
} // namespace content_filter
115+
} // namespace examples
116+
} // namespace fastdds
117+
} // namespace eprosima
118+
119+
#endif // FASTDDS_EXAMPLES_CPP_CONTENT_FILTER__SUBSCRIBERAPP_HPP

include/fastdds/rtps/writer/ReaderProxy.h

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ class ReaderProxy
143143
FragmentNumber_t& next_unsent_frag,
144144
SequenceNumber_t& gap_seq,
145145
const SequenceNumber_t& min_seq,
146-
bool& need_reactivate_periodic_heartbeat) const;
146+
bool& need_reactivate_periodic_heartbeat);
147147

148148
/**
149149
* Mark all changes up to the one indicated by seq_num as Acknowledged.
@@ -345,11 +345,38 @@ class ReaderProxy
345345
* Get the highest fully acknowledged sequence number.
346346
* @return the highest fully acknowledged sequence number.
347347
*/
348-
SequenceNumber_t changes_low_mark() const
348+
inline SequenceNumber_t changes_low_mark() const
349349
{
350350
return changes_low_mark_;
351351
}
352352

353+
/*!
354+
* Get the first sequence number not relevant that was removed without reader being informed.
355+
* @return First sequence number.
356+
*/
357+
inline SequenceNumber_t first_irrelevant_removed() const
358+
{
359+
return first_irrelevant_removed_;
360+
}
361+
362+
/*!
363+
* Get the last sequence number not relevant that was removed without reader being informed.
364+
* @return last sequence number.
365+
*/
366+
inline SequenceNumber_t last_irrelevant_removed() const
367+
{
368+
return last_irrelevant_removed_;
369+
}
370+
371+
/*!
372+
* Reset the interval of sequence numbers not relevant that were removed without reader being informed.
373+
*/
374+
inline void reset_irrelevant_removed()
375+
{
376+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
377+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
378+
}
379+
353380
/**
354381
* Change the interval of nack-supression event.
355382
* @param interval Time from data sending to acknack processing.
@@ -450,8 +477,14 @@ class ReaderProxy
450477
//! Last NACKFRAG count.
451478
uint32_t last_nackfrag_count_;
452479

480+
//! Sequence number of the lowest change not fully acknowledged.
453481
SequenceNumber_t changes_low_mark_;
454482

483+
//! First sequence number not relevant that was removed without reader being informed.
484+
SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
485+
//! Last sequence number not relevant that was removed without reader being informed.
486+
SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};
487+
455488
bool active_ = false;
456489

457490
using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;

include/fastdds/rtps/writer/StatefulWriter.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,13 @@ class StatefulWriter : public RTPSWriter
486486
*/
487487
bool ack_timer_expired();
488488

489-
void send_heartbeat_to_all_readers();
489+
/*!
490+
* Send heartbeat to all the remote readers.
491+
* @param force_separating True to send the heartbeat separately for each reader.
492+
* False to send a unique heartbeat to all the readers.
493+
*/
494+
void send_heartbeat_to_all_readers(
495+
bool force_separating);
490496

491497
void deliver_sample_to_intraprocesses(
492498
CacheChange_t* change);
@@ -503,6 +509,10 @@ class StatefulWriter : public RTPSWriter
503509
void prepare_datasharing_delivery(
504510
CacheChange_t* change);
505511

512+
void add_gaps_for_removed_irrelevants(
513+
ReaderProxy& remoteReaderProxy,
514+
RTPSMessageGroup& group);
515+
506516
/**
507517
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
508518
*

src/cpp/rtps/writer/ReaderProxy.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,25 +227,40 @@ void ReaderProxy::add_change(
227227
const ChangeForReader_t& change,
228228
bool is_relevant)
229229
{
230-
assert(change.getSequenceNumber() > changes_low_mark_);
230+
SequenceNumber_t seq_num {change.getSequenceNumber()};
231+
assert(seq_num > changes_low_mark_);
231232
assert(changes_for_reader_.empty() ? true :
232-
change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber());
233+
seq_num > changes_for_reader_.back().getSequenceNumber());
233234

234235
// Irrelevant changes are not added to the collection
235236
if (!is_relevant)
236237
{
237-
if ( !is_reliable_ &&
238-
changes_low_mark_ + 1 == change.getSequenceNumber())
238+
if (is_reliable_)
239+
{
240+
if (!is_local_reader())
241+
{
242+
if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
243+
{
244+
first_irrelevant_removed_ = seq_num;
245+
last_irrelevant_removed_ = seq_num;
246+
}
247+
else if (seq_num == last_irrelevant_removed_ + 1)
248+
{
249+
last_irrelevant_removed_ = seq_num;
250+
}
251+
}
252+
}
253+
else if (changes_low_mark_ + 1 == seq_num)
239254
{
240-
changes_low_mark_ = change.getSequenceNumber();
255+
changes_low_mark_ = seq_num;
241256
}
242257
return;
243258
}
244259

245260
if (changes_for_reader_.push_back(change) == nullptr)
246261
{
247262
// This should never happen
248-
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber()
263+
EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num
249264
<< " to reader proxy " << guid());
250265
eprosima::fastdds::dds::Log::Flush();
251266
assert(false);
@@ -281,7 +296,7 @@ bool ReaderProxy::change_is_unsent(
281296
FragmentNumber_t& next_unsent_frag,
282297
SequenceNumber_t& gap_seq,
283298
const SequenceNumber_t& min_seq,
284-
bool& need_reactivate_periodic_heartbeat) const
299+
bool& need_reactivate_periodic_heartbeat)
285300
{
286301
if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
287302
{
@@ -328,6 +343,21 @@ bool ReaderProxy::change_is_unsent(
328343
gap_seq = SequenceNumber_t::unknown();
329344
}
330345
}
346+
347+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
348+
SequenceNumber_t::unknown() != gap_seq)
349+
{
350+
// Check if the hole is due to irrelevant changes removed without informing the reader
351+
if (gap_seq == first_irrelevant_removed_)
352+
{
353+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
354+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
355+
}
356+
else if (gap_seq < last_irrelevant_removed_)
357+
{
358+
last_irrelevant_removed_ = gap_seq - 1;
359+
}
360+
}
331361
}
332362
}
333363
}
@@ -436,6 +466,20 @@ bool ReaderProxy::requested_changes_set(
436466
else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
437467
{
438468
gap_builder.add(sit);
469+
470+
if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
471+
{
472+
// Check if the hole is due to irrelevant changes removed without informing the reader
473+
if (sit == first_irrelevant_removed_)
474+
{
475+
first_irrelevant_removed_ = SequenceNumber_t::unknown();
476+
last_irrelevant_removed_ = SequenceNumber_t::unknown();
477+
}
478+
else if (sit < last_irrelevant_removed_)
479+
{
480+
last_irrelevant_removed_ = sit - 1;
481+
}
482+
}
439483
}
440484
});
441485
}

0 commit comments

Comments
 (0)