Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,10 @@ public interface ConfigurationOptions {
String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";

boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false;

//设置每个分区仅提交一个StreamLoad任务,以保证任务失败时task重试不会导致对同一批数据重复提交StreamLoad任务。
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

english comment

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will change it to English.

String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity";

boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY, ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean

logger.info(s"maxRowCount ${maxRowCount}")
logger.info(s"maxRetryTimes ${maxRetryTimes}")
Expand All @@ -83,10 +84,11 @@ private[sql] class DorisSourceProvider extends DataSourceRegister
line.add(field.asInstanceOf[AnyRef])
}
rowsBuffer.add(line)
if (rowsBuffer.size > maxRowCount) {
if (!partitionTaskAtomicity && rowsBuffer.size > maxRowCount) {
flush
}
})
flush
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this flush deal with?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flush is to allow each partition to commit only one StreamLoad task when the partitionTaskAtomicity parameter is true, but seems redundant because the code already has the "if (! rowArray.isEmpty) {flush} "operation.

// flush buffer
if (!rowsBuffer.isEmpty) {
flush
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT)
val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE)
val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean
val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY, ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean
val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings)

override def addBatch(batchId: Long, data: DataFrame): Unit = {
Expand Down Expand Up @@ -70,10 +71,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe
}
}
rowArray.add(rowNode)
if (rowArray.size > maxRowCount - 1) {
if (!partitionTaskAtomicity && rowArray.size > maxRowCount - 1) {
flush
}
})
flush

// flush buffer
if (!rowArray.isEmpty) {
flush
Expand Down