From 3a1aed99878a009c5f5140792ecd534c1ff5b363 Mon Sep 17 00:00:00 2001 From: baishao_de <1025869797@qq.com> Date: Thu, 6 Apr 2023 08:48:15 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=88=86=E5=8C=BA?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=8E=9F=E5=AD=90=E6=80=A7=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E5=8F=82=E6=95=B0=EF=BC=8C=E5=8F=82=E6=95=B0=E9=BB=98=E8=AE=A4?= =?UTF-8?q?=E5=80=BC=E4=B8=BAfalse,=E8=8B=A5=E4=B8=BAfalse,=E5=88=99?= =?UTF-8?q?=E4=B8=BA=E6=AF=8F=E4=B8=AA=E5=88=86=E5=8C=BA=E5=8F=AF=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E5=A4=9A=E6=AC=A1StreamLoad,=E8=8B=A5=E4=B8=BAtrue,?= =?UTF-8?q?=E5=88=99=E9=99=90=E5=88=B6=E6=AF=8F=E4=B8=AA=E5=88=86=E5=8C=BA?= =?UTF-8?q?=E5=8F=AA=E8=83=BD=E6=89=A7=E8=A1=8C=E4=B8=80=E6=AC=A1StreamLoa?= =?UTF-8?q?d,=E5=8D=B3=E5=B0=86=E4=B8=80=E4=B8=AA=E5=88=86=E5=8C=BA?= =?UTF-8?q?=E7=9A=84=E6=89=80=E6=9C=89=E6=95=B0=E6=8D=AE=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E4=B8=80=E4=B8=AAStreamLoad=E4=BB=BB=E5=8A=A1=E5=86=99?= =?UTF-8?q?=E5=85=A5Doris=E3=80=82=E9=81=BF=E5=85=8Dtask=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E5=A4=B1=E8=B4=A5=E9=87=8D=E8=AF=95=E6=97=B6=E5=AF=B9=E5=90=8C?= =?UTF-8?q?=E6=A0=B7=E7=9A=84=E6=95=B0=E6=8D=AE=E6=8F=90=E4=BA=A4=E5=A4=9A?= =?UTF-8?q?=E6=AC=A1StreamLoad=E4=BB=BB=E5=8A=A1=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/doris/spark/cfg/ConfigurationOptions.java | 6 ++++++ .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 5ef9f193..c298bbb7 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -85,4 +85,10 @@ public interface ConfigurationOptions { String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition"; boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false; + + //设置每个分区仅提交一个StreamLoad任务,以保证任务失败时task重试不会导致对同一批数据重复提交StreamLoad任务。 + String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity"; + + boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false; + } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 130ce218..457e1428 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -37,6 +37,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean + val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY, ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) override def addBatch(batchId: Long, data: DataFrame): Unit = { @@ -70,10 +71,12 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe } } rowArray.add(rowNode) - if (rowArray.size > maxRowCount - 1) { + if (!partitionTaskAtomicity & rowArray.size > maxRowCount - 1) { flush } }) + flush + // flush buffer if (!rowArray.isEmpty) { flush From 9b0efff9342f079c7fd77758f05f88c3f6364197 Mon Sep 17 00:00:00 2001 From: baishao_de <1025869797@qq.com> Date: Wed, 17 May 2023 20:08:37 +0800 Subject: [PATCH 2/8] fix --- .../org/apache/doris/spark/sql/DorisSourceProvider.scala | 4 +++- .../org/apache/doris/spark/sql/DorisStreamLoadSink.scala | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index 2922d638..2cba0d57 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -65,6 +65,7 @@ private[sql] class DorisSourceProvider extends DataSourceRegister val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean + val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY, ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean logger.info(s"maxRowCount ${maxRowCount}") logger.info(s"maxRetryTimes ${maxRetryTimes}") @@ -83,10 +84,11 @@ private[sql] class DorisSourceProvider extends DataSourceRegister line.add(field.asInstanceOf[AnyRef]) } rowsBuffer.add(line) - if (rowsBuffer.size > maxRowCount) { + if (!partitionTaskAtomicity && rowsBuffer.size > maxRowCount) { flush } }) + flush // flush buffer if (!rowsBuffer.isEmpty) { flush diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 457e1428..ffebaf6f 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -71,7 +71,7 @@ private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSe } } rowArray.add(rowNode) - if (!partitionTaskAtomicity & rowArray.size > maxRowCount - 1) { + if (!partitionTaskAtomicity && rowArray.size > maxRowCount - 1) { flush } }) From 4538a430e3321be4949a9637f35bb163e5f1256f Mon Sep 17 00:00:00 2001 From: gnehil Date: Tue, 30 May 2023 22:55:04 +0800 Subject: [PATCH 3/8] [refactor] Unified writing through DorisWriter (#104) * use writer to write data * resolve conflicts * unify jackson version * remove useless code --- spark-doris-connector/pom.xml | 2 +- .../CachedDorisStreamLoadClient.java | 7 +- .../spark/{ => load}/DorisStreamLoad.java | 39 ++++----- .../doris/spark/sql/DorisSourceProvider.scala | 62 +------------- .../doris/spark/sql/DorisStreamLoadSink.scala | 52 ++---------- .../org/apache/doris/spark/sql/Utils.scala | 6 +- .../doris/spark/writer/DorisWriter.scala | 81 +++++++++++++++++++ 7 files changed, 116 insertions(+), 133 deletions(-) rename spark-doris-connector/src/main/java/org/apache/doris/spark/{ => load}/CachedDorisStreamLoadClient.java (90%) rename spark-doris-connector/src/main/java/org/apache/doris/spark/{ => load}/DorisStreamLoad.java (96%) create mode 100644 spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 77b37c16..e4b4c8b9 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -70,7 +70,7 @@ UTF-8 github 4.1.77.Final - 2.13.3 + 2.10.5 1.0.0 diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java similarity index 90% rename from spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java index 1d891261..d3dab491 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/CachedDorisStreamLoadClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java @@ -15,17 +15,12 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark; +package org.apache.doris.spark.load; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.DorisException; - -import java.io.IOException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java similarity index 96% rename from spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java rename to spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 6738c099..61379e36 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -14,15 +14,8 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package org.apache.doris.spark; +package org.apache.doris.spark.load; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import org.apache.commons.collections.MapUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.cfg.ConfigurationOptions; import org.apache.doris.spark.cfg.SparkSettings; import org.apache.doris.spark.exception.StreamLoadException; @@ -30,6 +23,14 @@ import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.RespContent; import org.apache.doris.spark.util.ListUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpHeaders; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; @@ -45,10 +46,17 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Date; import java.sql.Timestamp; -import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Base64; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -73,13 +81,11 @@ public class DorisStreamLoad implements Serializable { private String tbl; private String authEncoded; private String columns; - private String[] dfColumns; private String maxFilterRatio; private Map streamLoadProp; private static final long cacheExpireTimeout = 4 * 60; private final LoadingCache> cache; private final String fileType; - private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS"); public DorisStreamLoad(SparkSettings settings) { String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); @@ -101,11 +107,6 @@ public DorisStreamLoad(SparkSettings settings) { } } - public DorisStreamLoad(SparkSettings settings, String[] dfColumns) { - this(settings); - this.dfColumns = dfColumns; - } - public String getLoadUrlStr() { if (StringUtils.isEmpty(loadUrlStr)) { return ""; @@ -168,7 +169,7 @@ public String listToString(List> rows) { } - public void loadV2(List> rows) throws StreamLoadException, JsonProcessingException { + public void loadV2(List> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException { if (fileType.equals("csv")) { load(listToString(rows)); } else if(fileType.equals("json")) { diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala index e469f38a..94fab9e6 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala @@ -17,9 +17,9 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.DorisStreamLoad -import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.cfg.SparkSettings import org.apache.doris.spark.sql.DorisSourceProvider.SHORT_NAME +import org.apache.doris.spark.writer.DorisWriter import org.apache.spark.SparkConf import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources._ @@ -28,12 +28,7 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.slf4j.{Logger, LoggerFactory} -import java.io.IOException -import java.time.Duration -import java.util -import java.util.Objects import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.util.{Failure, Success} private[sql] class DorisSourceProvider extends DataSourceRegister with RelationProvider @@ -60,58 +55,9 @@ private[sql] class DorisSourceProvider extends DataSourceRegister val sparkSettings = new SparkSettings(sqlContext.sparkContext.getConf) sparkSettings.merge(Utils.params(parameters, logger).asJava) // init stream loader - val dorisStreamLoader = new DorisStreamLoad(sparkSettings, data.columns) + val writer = new DorisWriter(sparkSettings) + writer.write(data) - val maxRowCount = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) - val maxRetryTimes = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) - val sinkTaskPartitionSize = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) - val sinkTaskUseRepartition = sparkSettings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - val batchInterValMs = sparkSettings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - - logger.info(s"maxRowCount ${maxRowCount}") - logger.info(s"maxRetryTimes ${maxRetryTimes}") - logger.info(s"batchInterVarMs ${batchInterValMs}") - - var resultRdd = data.rdd - if (Objects.nonNull(sinkTaskPartitionSize)) { - resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) - } - - resultRdd.foreachPartition(partition => { - val rowsBuffer: util.List[util.List[Object]] = new util.ArrayList[util.List[Object]](maxRowCount) - partition.foreach(row => { - val line: util.List[Object] = new util.ArrayList[Object]() - for (i <- 0 until row.size) { - val field = row.get(i) - line.add(field.asInstanceOf[AnyRef]) - } - rowsBuffer.add(line) - if (rowsBuffer.size > maxRowCount - 1 ) { - flush() - } - }) - // flush buffer - if (!rowsBuffer.isEmpty) { - flush() - } - - /** - * flush data to Doris and do retry when flush error - * - */ - def flush(): Unit = { - Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { - dorisStreamLoader.loadV2(rowsBuffer) - rowsBuffer.clear() - } match { - case Success(_) => - case Failure(e) => - throw new IOException( - s"Failed to load $maxRowCount batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e) - } - } - - }) new BaseRelation { override def sqlContext: SQLContext = unsupportedException diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala index 46448205..342e940e 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisStreamLoadSink.scala @@ -17,69 +17,27 @@ package org.apache.doris.spark.sql -import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} -import org.apache.doris.spark.{CachedDorisStreamLoadClient, DorisStreamLoad} -import org.apache.spark.rdd.RDD +import org.apache.doris.spark.cfg.SparkSettings +import org.apache.doris.spark.writer.DorisWriter import org.apache.spark.sql.execution.streaming.Sink -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.slf4j.{Logger, LoggerFactory} -import java.io.IOException -import java.time.Duration -import java.util -import java.util.Objects -import scala.collection.JavaConverters._ -import scala.util.{Failure, Success} - private[sql] class DorisStreamLoadSink(sqlContext: SQLContext, settings: SparkSettings) extends Sink with Serializable { private val logger: Logger = LoggerFactory.getLogger(classOf[DorisStreamLoadSink].getName) @volatile private var latestBatchId = -1L - val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) - val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) - val sinkTaskPartitionSize = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) - val sinkTaskUseRepartition = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - val batchInterValMs = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) + private val writer = new DorisWriter(settings) override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= latestBatchId) { logger.info(s"Skipping already committed batch $batchId") } else { - write(data.rdd) + writer.write(data) latestBatchId = batchId } } - def write(rdd: RDD[Row]): Unit = { - var resultRdd = rdd - if (Objects.nonNull(sinkTaskPartitionSize)) { - resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) - } - resultRdd - .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) - .foreachPartition(partition => { - partition - .grouped(batchSize) - .foreach(batch => flush(batch)) - }) - - /** - * flush data to Doris and do retry when flush error - * - */ - def flush(batch: Iterable[util.List[Object]]): Unit = { - Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { - dorisStreamLoader.loadV2(batch.toList.asJava) - } match { - case Success(_) => - case Failure(e) => - throw new IOException( - s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max $maxRetryTimes retry times.", e) - } - } - } - override def toString: String = "DorisStreamLoadSink" } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index ba6fa861..2f3a5bb0 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -31,7 +31,7 @@ import scala.annotation.tailrec import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} -private[sql] object Utils { +private[spark] object Utils { /** * quote column name * @param colName column name @@ -169,7 +169,9 @@ private[sql] object Utils { assert(retryTimes >= 0) val result = Try(f) result match { - case Success(result) => Success(result) + case Success(result) => + LockSupport.parkNanos(interval.toNanos) + Success(result) case Failure(exception: T) if retryTimes > 0 => logger.warn(s"Execution failed caused by: ", exception) logger.warn(s"$retryTimes times retry remaining, the next will be in ${interval.toMillis}ms") diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala new file mode 100644 index 00000000..3839ff71 --- /dev/null +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.writer + +import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} +import org.apache.doris.spark.load.{CachedDorisStreamLoadClient, DorisStreamLoad} +import org.apache.doris.spark.sql.Utils +import org.apache.spark.sql.DataFrame +import org.slf4j.{Logger, LoggerFactory} + +import java.io.IOException +import java.time.Duration +import java.util +import java.util.Objects +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success} + +class DorisWriter(settings: SparkSettings) extends Serializable { + + private val logger: Logger = LoggerFactory.getLogger(classOf[DorisWriter]) + + val batchSize: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_SIZE, + ConfigurationOptions.SINK_BATCH_SIZE_DEFAULT) + private val maxRetryTimes: Int = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_MAX_RETRIES, + ConfigurationOptions.SINK_MAX_RETRIES_DEFAULT) + private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) + private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, + ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean + private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, + ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) + + private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) + + def write(dataFrame: DataFrame): Unit = { + var resultRdd = dataFrame.rdd + val dfColumns = dataFrame.columns + if (Objects.nonNull(sinkTaskPartitionSize)) { + resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) + } + resultRdd + .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) + .foreachPartition(partition => { + partition + .grouped(batchSize) + .foreach(batch => flush(batch, dfColumns)) + }) + + /** + * flush data to Doris and do retry when flush error + * + */ + def flush(batch: Iterable[util.List[Object]], dfColumns: Array[String]): Unit = { + Utils.retry[Unit, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + dorisStreamLoader.loadV2(batch.toList.asJava, dfColumns) + } match { + case Success(_) => + case Failure(e) => + throw new IOException( + s"Failed to load batch data on BE: ${dorisStreamLoader.getLoadUrlStr} node and exceeded the max ${maxRetryTimes} retry times.", e) + } + } + + } + + +} From 84dbb2c4b18a1c0d96ba0aa3a2d45f89dde976f7 Mon Sep 17 00:00:00 2001 From: gnehil Date: Mon, 5 Jun 2023 12:09:10 +0800 Subject: [PATCH 4/8] Update version (#106) --- spark-doris-connector/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index e4b4c8b9..def5c00b 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -27,7 +27,7 @@ org.apache.doris spark-doris-connector-${spark.major.version}_${scala.version} - 1.0.0-SNAPSHOT + 1.2.0-SNAPSHOT Spark Doris Connector https://doris.apache.org/ From 9291a2e112ce683413c9e18ca0f91931f5156d9e Mon Sep 17 00:00:00 2001 From: caoliang-web <71004656+caoliang-web@users.noreply.github.com> Date: Mon, 5 Jun 2023 14:31:27 +0800 Subject: [PATCH 5/8] Update pom.xml (#107) --- spark-doris-connector/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index def5c00b..471eecd9 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -27,7 +27,7 @@ org.apache.doris spark-doris-connector-${spark.major.version}_${scala.version} - 1.2.0-SNAPSHOT + ${revision} Spark Doris Connector https://doris.apache.org/ @@ -62,6 +62,7 @@ + 1.2.0-SNAPSHOT 3.1.2 3.1 2.12 From 53525c20bcfb638da68356af6e50f65aa5e5c995 Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Mon, 5 Jun 2023 15:57:54 +0800 Subject: [PATCH 6/8] Pom update (#108) --- spark-doris-connector/pom.xml | 441 +++++++++++++++++++++------------- 1 file changed, 268 insertions(+), 173 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 471eecd9..a5a7e7a1 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -30,6 +30,7 @@ ${revision} Spark Doris Connector https://doris.apache.org/ + Apache 2.0 License @@ -37,16 +38,19 @@ repo + scm:git:https://git@github.com/apache/doris-spark-connector.git scm:git:https://git@github.com/apache/doris-spark-connector.git scm:git:https://git@github.com/apache/doris-spark-connector.git HEAD + GitHub https://github.com/apache/doris/issues + Dev Mailing List @@ -61,6 +65,7 @@ commits-unsubscribe@doris.apache.org + 1.2.0-SNAPSHOT 3.1.2 @@ -181,6 +186,170 @@ + + + + + org.apache.maven.plugins + maven-gpg-plugin + + + sign-artifacts + verify + + sign + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.2.0 + + + add-source + generate-sources + + add-source + + + + + ${basedir}/src/main/java + ${basedir}/src/main/scala + + + + + + + net.alchim31.maven + scala-maven-plugin + 3.2.1 + + + scala-compile-first + process-resources + + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -feature + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.1 + + + + com.google.code.findbugs:* + org.slf4j:* + + + + + org.apache.arrow + org.apache.doris.shaded.org.apache.arrow + + + io.netty + org.apache.doris.shaded.io.netty + + + com.fasterxml.jackson + org.apache.doris.shaded.com.fasterxml.jackson + + + org.apache.commons.codec + org.apache.doris.shaded.org.apache.commons.codec + + + com.google.flatbuffers + org.apache.doris.shaded.com.google.flatbuffers + + + org.apache.thrift + org.apache.doris.shaded.org.apache.thrift + + + + + + package + + shade + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 8 + 8 + + + + org.codehaus.mojo + license-maven-plugin + 2.0.0 + + + add-third-party + + add-third-party + + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + org.codehaus.mojo + build-helper-maven-plugin + + + + net.alchim31.maven + scala-maven-plugin + + + + org.codehaus.mojo + license-maven-plugin + + + + + @@ -203,6 +372,7 @@ + general-env @@ -219,178 +389,103 @@ + + + apache-release + + + + + org.apache.maven.plugins + maven-gpg-plugin + + + sign-artifacts + verify + + sign + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + true + 8 + false + + + + attach-javadocs + + jar + + + + + + + org.apache.maven.plugins + maven-source-plugin + + true + + + + compile + + jar + + + + + + + org.apache.maven.plugins + maven-release-plugin + + ${releaseArgs} + + + + + org.apache.maven.plugins + maven-deploy-plugin + 3.0.0-M1 + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.apache.maven.plugins + maven-gpg-plugin + + + + org.apache.maven.plugins + maven-release-plugin + + + + org.apache.maven.plugins + maven-deploy-plugin + + + + - - - - org.apache.maven.plugins - maven-gpg-plugin - - - sign-artifacts - verify - - sign - - - - - - - org.codehaus.mojo - build-helper-maven-plugin - 3.2.0 - - - add-source - generate-sources - - add-source - - - - - ${basedir}/src/main/java - ${basedir}/src/main/scala - - - - - - - net.alchim31.maven - scala-maven-plugin - 3.2.1 - - - scala-compile-first - process-resources - - compile - - - - scala-test-compile - process-test-resources - - testCompile - - - - - - -feature - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.2.1 - - - - com.google.code.findbugs:* - org.slf4j:* - - - - - org.apache.arrow - org.apache.doris.shaded.org.apache.arrow - - - io.netty - org.apache.doris.shaded.io.netty - - - com.fasterxml.jackson - org.apache.doris.shaded.com.fasterxml.jackson - - - org.apache.commons.codec - org.apache.doris.shaded.org.apache.commons.codec - - - com.google.flatbuffers - org.apache.doris.shaded.com.google.flatbuffers - - - org.apache.thrift - org.apache.doris.shaded.org.apache.thrift - - - - - - package - - shade - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - - - - org.apache.maven.plugins - maven-javadoc-plugin - - true - 8 - false - - - - attach-javadocs - - jar - - - - - - org.apache.maven.plugins - maven-source-plugin - - true - - - - compile - - jar - - - - - - org.codehaus.mojo - license-maven-plugin - 2.0.0 - - - add-third-party - - add-third-party - - - - - - org.apache.maven.plugins - maven-release-plugin - - ${releaseArgs} - - - - - + \ No newline at end of file From 83fdd53748ecd9aeab82c4d5996e9040ebe55f73 Mon Sep 17 00:00:00 2001 From: baishao_de <1025869797@qq.com> Date: Thu, 6 Apr 2023 08:48:15 +0800 Subject: [PATCH 7/8] Added atomicity control parameters for partition task, Parameter defaults to false. If false, StreamLoad can be performed multiple times per partition. If true, limit StreamLoad to one time per partition, that is, all the data of a partition is written to Doris via a StreamLoad task. Avoid submitting multiple StreamLoad tasks with the same data when the task fails to retry. --- .../org/apache/doris/spark/cfg/ConfigurationOptions.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a3f40611..909c5de9 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -90,4 +90,10 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; + + //设置每个分区仅提交一个StreamLoad任务,以保证任务失败时task重试不会导致对同一批数据重复提交StreamLoad任务。 + String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity"; + + boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false; + } From 6e1f6bb270ec5d703f19ee293be454c110748363 Mon Sep 17 00:00:00 2001 From: baishao_de <1025869797@qq.com> Date: Tue, 6 Jun 2023 13:38:10 +0800 Subject: [PATCH 8/8] resolve conflict --- .../doris/spark/cfg/ConfigurationOptions.java | 15 ++++++----- .../doris/spark/writer/DorisWriter.scala | 25 +++++++++++++------ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 6a339895..a93e7fe0 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -86,19 +86,18 @@ public interface ConfigurationOptions { boolean DORIS_SINK_TASK_USE_REPARTITION_DEFAULT = false; -// <<<<<<< baishaode_pr - //Set only one StreamLoad task to be submitted per partition to ensure that task retries do not result in repeated submission of StreamLoad tasks on the same batch of data if the task fails. - String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity"; - - boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false; -// ======= String DORIS_SINK_BATCH_INTERVAL_MS = "doris.sink.batch.interval.ms"; int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; -// >>>>>>> master - //设置每个分区仅提交一个StreamLoad任务,以保证任务失败时task重试不会导致对同一批数据重复提交StreamLoad任务。 + + + /* + Set only one StreamLoad task to be submitted per partition + to ensure that task retries do not result in repeated submission + of StreamLoad tasks on the same batch of data if the task fails. + */ String DORIS_SINK_PER_PARTITION_TASK_ATOMICITY = "doris.sink.per.partition.task.atomicity"; boolean DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT = false; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 3839ff71..66f58c60 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -43,7 +43,8 @@ class DorisWriter(settings: SparkSettings) extends Serializable { ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) - + val partitionTaskAtomicity = settings.getProperty(ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY, + ConfigurationOptions.DORIS_SINK_PER_PARTITION_TASK_ATOMICITY_DEFAULT.toString).toBoolean private val dorisStreamLoader: DorisStreamLoad = CachedDorisStreamLoadClient.getOrCreate(settings) def write(dataFrame: DataFrame): Unit = { @@ -52,13 +53,21 @@ class DorisWriter(settings: SparkSettings) extends Serializable { if (Objects.nonNull(sinkTaskPartitionSize)) { resultRdd = if (sinkTaskUseRepartition) resultRdd.repartition(sinkTaskPartitionSize) else resultRdd.coalesce(sinkTaskPartitionSize) } - resultRdd - .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) - .foreachPartition(partition => { - partition - .grouped(batchSize) - .foreach(batch => flush(batch, dfColumns)) - }) + if (partitionTaskAtomicity) { + resultRdd + .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) + .foreachPartition(partition => { + flush(partition.toIterable, dfColumns) + }) + } else { + resultRdd + .map(_.toSeq.map(_.asInstanceOf[AnyRef]).toList.asJava) + .foreachPartition(partition => { + partition + .grouped(batchSize) + .foreach(batch => flush(batch, dfColumns)) + }) + } /** * flush data to Doris and do retry when flush error