@@ -5415,6 +5415,64 @@ rd_kafka_cgrp_subscription_set(rd_kafka_cgrp_t *rkcg,
54155415 return rd_atomic32_add (& rkcg -> rkcg_subscription_version , 1 );
54165416}
54175417
5418+ static rd_bool_t
5419+ rd_kafka_cgrp_stale_subscribed_topics_cache (rd_kafka_cgrp_t * rkcg ) {
5420+ rd_bool_t stale = rd_false ;
5421+ int32_t current_subscription_version =
5422+ rd_atomic32_get (& rkcg -> rkcg_subscription_version );
5423+ rd_ts_t current_ts_metadata = rkcg -> rkcg_rk -> rk_ts_metadata ;
5424+
5425+ stale = rkcg -> rkcg_subscribed_topics_cache .rk_ts_metadata !=
5426+ current_ts_metadata ||
5427+ rkcg -> rkcg_subscribed_topics_cache .subscription_version !=
5428+ current_subscription_version ;
5429+ if (stale ) {
5430+ rkcg -> rkcg_subscribed_topics_cache .rk_ts_metadata =
5431+ current_ts_metadata ;
5432+ rkcg -> rkcg_subscribed_topics_cache .subscription_version =
5433+ current_subscription_version ;
5434+ }
5435+ return stale ;
5436+ }
5437+
5438+ static rd_bool_t
5439+ rd_kafka_cgrp_set_subscribed_topics_from_subscription (rd_kafka_cgrp_t * rkcg ) {
5440+ rd_list_t * tinfos ;
5441+ rd_kafka_topic_partition_list_t * errored ;
5442+
5443+ if (!rd_kafka_cgrp_stale_subscribed_topics_cache (rkcg ))
5444+ return rd_false ; /* Not stale, no change */
5445+
5446+ /*
5447+ * Unmatched topics will be added to the errored list.
5448+ */
5449+ errored = rd_kafka_topic_partition_list_new (0 );
5450+
5451+ /*
5452+ * Create a list of the topics in metadata that matches our subscription
5453+ */
5454+ tinfos = rd_list_new (rkcg -> rkcg_subscription -> cnt ,
5455+ rd_kafka_topic_info_destroy_free );
5456+
5457+ if (rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION )
5458+ rd_kafka_metadata_topic_match (rkcg -> rkcg_rk , tinfos ,
5459+ rkcg -> rkcg_subscription , errored );
5460+ else
5461+ rd_kafka_metadata_topic_filter (
5462+ rkcg -> rkcg_rk , tinfos , rkcg -> rkcg_subscription , errored );
5463+
5464+ /*
5465+ * Propagate consumer errors for any non-existent or errored topics.
5466+ * The function takes ownership of errored.
5467+ */
5468+ rd_kafka_propagate_consumer_topic_errors (
5469+ rkcg , errored , "Subscribed topic not available" );
5470+
5471+ /*
5472+ * Update effective list of topics (takes ownership of \c tinfos)
5473+ */
5474+ return rd_kafka_cgrp_update_subscribed_topics (rkcg , tinfos );
5475+ }
54185476
54195477/**
54205478 * @brief Handle a new subscription that is modifying an existing subscription
@@ -5427,11 +5485,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
54275485 rd_kafka_topic_partition_list_t * rktparlist ) {
54285486 rd_kafka_topic_partition_list_t * unsubscribing_topics ;
54295487 rd_kafka_topic_partition_list_t * revoking ;
5430- rd_list_t * tinfos ;
5431- rd_kafka_topic_partition_list_t * errored ;
54325488 int metadata_age ;
54335489 int old_cnt = rkcg -> rkcg_subscription -> cnt ;
54345490 int32_t cgrp_subscription_version ;
5491+ rd_bool_t changed = rd_false ;
54355492
54365493 /* Topics in rkcg_subscribed_topics that don't match any pattern in
54375494 the new subscription. */
@@ -5486,27 +5543,10 @@ rd_kafka_cgrp_modify_subscription(rd_kafka_cgrp_t *rkcg,
54865543 if (unsubscribing_topics )
54875544 rd_kafka_topic_partition_list_destroy (unsubscribing_topics );
54885545
5489- /* Create a list of the topics in metadata that matches the new
5490- * subscription */
5491- tinfos = rd_list_new (rkcg -> rkcg_subscription -> cnt ,
5492- rd_kafka_topic_info_destroy_free );
5493-
5494- /* Unmatched topics will be added to the errored list. */
5495- errored = rd_kafka_topic_partition_list_new (0 );
54965546
5497- if (rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION )
5498- rd_kafka_metadata_topic_match (rkcg -> rkcg_rk , tinfos ,
5499- rkcg -> rkcg_subscription , errored );
5500- else
5501- rd_kafka_metadata_topic_filter (
5502- rkcg -> rkcg_rk , tinfos , rkcg -> rkcg_subscription , errored );
5503-
5504- /* Propagate consumer errors for any non-existent or errored topics.
5505- * The function takes ownership of errored. */
5506- rd_kafka_propagate_consumer_topic_errors (
5507- rkcg , errored , "Subscribed topic not available" );
5547+ changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription (rkcg );
55085548
5509- if (rd_kafka_cgrp_update_subscribed_topics ( rkcg , tinfos ) && !revoking ) {
5549+ if (changed && !revoking ) {
55105550 rd_kafka_cgrp_rejoin (rkcg , "Subscription modified" );
55115551 return RD_KAFKA_RESP_ERR_NO_ERROR ;
55125552 }
@@ -6900,7 +6940,6 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
69006940 return result ;
69016941}
69026942
6903-
69046943/**
69056944 * @brief Check if the latest metadata affects the current subscription:
69066945 * - matched topic added
@@ -6912,9 +6951,7 @@ rd_kafka_cgrp_owned_but_not_exist_partitions(rd_kafka_cgrp_t *rkcg) {
69126951 */
69136952void rd_kafka_cgrp_metadata_update_check (rd_kafka_cgrp_t * rkcg ,
69146953 rd_bool_t do_join ) {
6915- rd_list_t * tinfos ;
6916- rd_kafka_topic_partition_list_t * errored ;
6917- rd_bool_t changed ;
6954+ rd_bool_t changed = rd_false ;
69186955
69196956 rd_kafka_assert (NULL , thrd_is_current (rkcg -> rkcg_rk -> rk_thread ));
69206957
@@ -6924,36 +6961,7 @@ void rd_kafka_cgrp_metadata_update_check(rd_kafka_cgrp_t *rkcg,
69246961 if (!rkcg -> rkcg_subscription || rkcg -> rkcg_subscription -> cnt == 0 )
69256962 return ;
69266963
6927- /*
6928- * Unmatched topics will be added to the errored list.
6929- */
6930- errored = rd_kafka_topic_partition_list_new (0 );
6931-
6932- /*
6933- * Create a list of the topics in metadata that matches our subscription
6934- */
6935- tinfos = rd_list_new (rkcg -> rkcg_subscription -> cnt ,
6936- rd_kafka_topic_info_destroy_free );
6937-
6938- if (rkcg -> rkcg_flags & RD_KAFKA_CGRP_F_WILDCARD_SUBSCRIPTION )
6939- rd_kafka_metadata_topic_match (rkcg -> rkcg_rk , tinfos ,
6940- rkcg -> rkcg_subscription , errored );
6941- else
6942- rd_kafka_metadata_topic_filter (
6943- rkcg -> rkcg_rk , tinfos , rkcg -> rkcg_subscription , errored );
6944-
6945-
6946- /*
6947- * Propagate consumer errors for any non-existent or errored topics.
6948- * The function takes ownership of errored.
6949- */
6950- rd_kafka_propagate_consumer_topic_errors (
6951- rkcg , errored , "Subscribed topic not available" );
6952-
6953- /*
6954- * Update effective list of topics (takes ownership of \c tinfos)
6955- */
6956- changed = rd_kafka_cgrp_update_subscribed_topics (rkcg , tinfos );
6964+ changed = rd_kafka_cgrp_set_subscribed_topics_from_subscription (rkcg );
69576965
69586966 if (!do_join ||
69596967 (!changed &&
0 commit comments