Stream Load トランザクションインターフェースを使用したデータのロード
バージョン 2.4 以降、StarRocks は Stream Load トランザクションインターフェースを提供し、Apache Flink® や Apache Kafka® などの外部システムからデータをロードするために実行されるトランザクションに対して、2 フェーズコミット (2PC) を実装します。Stream Load トランザクションインターフェースは、高度に並行したストリームロードのパフォーマンスを向上させます。
バージョン 4.0 以降、Stream Load トランザクションインターフェースは複数テーブルトランザクションをサポートします。つまり、同一データベース内の複数のテーブルにデータをロードすることが可能です。
このトピックでは、Stream Load トランザクションインターフェースと、このインターフェースを使用して StarRocks にデータをロードする方法について説明します。
説明
Stream Load トランザクションインターフェースは、HTTP プロトコル互換のツールや言語を使用して API 操作を呼び出すことをサポートします。このトピックでは、curl を例にとってこのインターフェースの使用方法を説明します。このインターフェースは、トランザクション管理、データ書き込み、トランザクションの事前コミット、トランザクションの重複排除、トランザクションのタイムアウト管理など、さまざまな機能を提供します。
Stream Load は CSV および JSON ファイル形式をサポートします。個々のサイズが 10 GB を超えない少数のファイルからデータをロードしたい場合、この方法が推奨されます。Stream Load は Parquet ファイル形式をサポートしていません。Parquet ファイルからデータをロードする必要がある場合は、INSERT+files() を使用してください。
トランザクション管理
Stream Load トランザクションインターフェースは、トランザクションを管理するために使用される以下の API 操作を提供します。
- 
/api/transaction/begin: 新しいトランザクションを開始します。
- 
/api/transaction/prepare: 現在のトランザクションを事前コミットし、データ変更を一時的に永続化します。トランザクションを事前コミットした後、コミットまたはロールバックを実行できます。トランザクションが事前コミットされた後にクラスターがクラッシュした場合でも、クラスターが復旧した後、トランザクションをコミットし続けることができます。
- 
/api/transaction/commit: 現在のトランザクションをコミットしてデータの変更を永続化します。
- 
/api/transaction/rollback: 現在のトランザクションをロールバックしてデータの変更を中止します。
NOTE
トランザクションが事前コミットされた後は、そのトランザクションを使用してデータを書き続けないでください。トランザクションを使用してデータを書き続けると、書き込みリクエストがエラーを返します。
以下の図は、トランザクションの状態と操作の関係を示しています:
stateDiagram-v2
    direction LR
    [*] --> PREPARE : begin
    PREPARE --> PREPARED : prepare
    PREPARE --> ABORTED : rollback
    PREPARED --> COMMITTED : commit
    PREPARED --> ABORTED : rollback
データ書き込み
Stream Load トランザクションインターフェースは、データを書き込むための /api/transaction/load 操作を提供します。この操作は、1 つのトランザクション内で複数回呼び出すことができます。
バージョン 4.0 以降、異なるテーブルに対して /api/transaction/load 操作を呼び出すことで、同一データベース内の複数テーブルにデータをロードできます。
トランザクションの重複排除
Stream Load トランザクションインターフェースは、StarRocks のラベリングメカニズムを引き継いでいます。各トランザクションに一意のラベルをバインドすることで、トランザクションに対して最大 1 回の保証を実現できます。
トランザクションのタイムアウト管理
トランザクションを開始する際、HTTP リクエストヘッダーの timeout フィールドを使用して、PREPARE 状態から PREPARED 状態へのトランザクションのタイムアウト期間(秒単位)を指定できます。この期間内にトランザクションが準備完了状態に達しない場合、自動的に中止されます。このフィールドが指定されていない場合、デフォルト値は FE 設定の stream_load_default_timeout_secondによって決定されます(デフォルト:600 秒)。
トランザクションを開始する際、HTTP リクエストヘッダーの idle_transaction_timeout フィールドを使用して、トランザクションがアイドル状態のまま保持できるタイムアウト期間(秒単位)を指定できます。この期間内にデータが書き込まれない場合、トランザクションは自動的にロールバックされます。
トランザクションを準備する際、HTTP リクエストヘッダーの prepared_timeout フィールドを使用して、トランザクションが PREPARED 状態から COMMITTED 状態に移行するまでのタイムアウト期間(秒単位)を指定できます。この期間内にトランザクションがコミットされない場合、自動的に中止されます。このフィールドが指定されていない場合、デフォルト値は FE 設定の prepared_transaction_default_timeout_second によって決定されます(デフォルト:86400秒)。prepared_timeout は v3.5.4 以降でサポートされています。
利点
Stream Load トランザクションインターフェースは、次の利点をもたらします。
- 
厳密な一度だけのセマンティクス トランザクションは、事前コミットとコミットの 2 つのフェーズに分割され、システム間でデータをロードしやすくします。たとえば、このインターフェースは Flink からのデータロードに対して厳密な一度だけのセマンティクスを保証できます。 
- 
ロードパフォーマンスの向上 プログラムを使用してロードジョブを実行する場合、Stream Load トランザクションインターフェースを使用すると、複数のミニバッチのデータをオンデマンドでマージし、1 つのトランザクション内で /api/transaction/commit操作を呼び出して一度に送信できます。その結果、ロードするデータバージョンが少なくなり、ロードパフォーマンスが向上します。
制限
Stream Load トランザクションインターフェースには、次の制限があります。
- 
単一データベース複数テーブルトランザクションは v4.0 以降でサポートされています。複数データベース複数テーブルトランザクションのサポートは開発中です。 
- 
1 クライアントからの並行データ書き込み のみがサポートされています。複数クライアントからの並行データ書き込み のサポートは開発中です。 
- 
/api/transaction/load操作は、1 つのトランザクション内で複数回呼び出すことができます。この場合、呼び出されたすべての/api/transaction/load操作に指定されたパラメータ設定(tableを除く)は同じでなければなりません。
- 
Stream Load トランザクションインターフェースを使用して CSV 形式のデータをロードする場合、データファイル内の各データレコードが行区切り文字で終わるようにしてください。 
注意事項
- 呼び出した /api/transaction/begin、/api/transaction/load、または/api/transaction/prepare操作がエラーを返した場合、トランザクションは失敗し、自動的にロールバックされます。
- 新しいトランザクションを開始するために /api/transaction/begin操作を呼び出す際、ラベルを指定する必要があります。なお、後続の/api/transaction/load、/api/transaction/prepare、および/api/transaction/commit操作は、/api/transaction/begin操作と同じラベルを使用する必要があります。
- 進行中のトランザクションのラベルを使用して /api/transaction/begin操作を呼び出し新しいトランザクションを開始すると、以前のトランザクションは失敗しロールバックされます。
- 複数のテーブルにデータをロードするためにマルチテーブルトランザクションを使用する場合、トランザクションに関連するすべての操作に対してパラメータ -H "transaction_type:multi"を指定する必要があります。
- StarRocks が CSV 形式のデータに対してサポートするデフォルトのカラムセパレータと行区切り文字は \tと\nです。データファイルがデフォルトのカラムセパレータまたは行区切り文字を使用していない場合、/api/transaction/load操作を呼び出す際に、データファイルで実際に使用されているカラムセパレータまたは行区切り文字を"column_separator: <column_separator>"または"row_delimiter: <row_delimiter>"を使用して指定する必要があります。
始める前に
権限の確認
StarRocks のテーブルにデータを ロード するには、その StarRocks テーブルに対して INSERT 権限を持つユーザーである必要があります。INSERT 権限がない場合は、GRANT に記載されている手順に従って、StarRocks クラスターに接続するために使用するユーザーに INSERT 権限を付与してください。構文は GRANT INSERT ON TABLE <table_name> IN DATABASE <database_name> TO { ROLE <role_name> | USER <user_identity>} です。
ネットワーク設定の確認
ロードしたいデータが存在するマシンが、StarRocks クラスターの FE および BE ノードに http_port (デフォルト: 8030) および be_http_port (デフォルト: 8040) を介してアクセスできることを確認してください。
基本操作
サンプルデータの準備
このトピックでは、CSV 形式のデータを例として使用します。
- 
ローカルファイルシステムの /home/disk1/パスに、example1.csvという名前の CSV ファイルを作成します。このファイルは、ユーザー ID、ユーザー名、ユーザースコアを順に表す 3 つの列で構成されています。1,Lily,23
 2,Rose,23
 3,Alice,24
 4,Julia,25
- 
StarRocks データベース test_dbに、table1という名前の主キーテーブルを作成します。このテーブルは、id、name、scoreの 3 つの列で構成されており、idが主キーです。CREATE TABLE `table1`
 (
 `id` int(11) NOT NULL COMMENT "user ID",
 `name` varchar(65533) NULL COMMENT "user name",
 `score` int(11) NOT NULL COMMENT "user score"
 )
 ENGINE=OLAP
 PRIMARY KEY(`id`)
 DISTRIBUTED BY HASH(`id`) BUCKETS 10;
トランザクションの開始
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
    -H "Expect:100-continue" \
    [-H "transaction_type:multi"]\  # オプション。複数テーブルのトランザクションを開始します。
    -H "db:<database_name>" -H "table:<table_name>" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
NOTE
トランザクション内で異なるテーブルにデータをロードしたい場合は、コマンドに
-H "transaction_type:multi"を指定してください。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
    -H "Expect:100-continue" \
    -H "db:test_db" -H "table:table1" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/begin
NOTE
この例では、
streamload_txn_example1_table1がトランザクションのラベルとして指定されています。
戻り結果
- 
トランザクションが正常に開始された場合、次の結果が返されます。 {
 "Status": "OK",
 "Message": "",
 "Label": "streamload_txn_example1_table1",
 "TxnId": 9032,
 "BeginTxnTimeMs": 0
 }
- 
トランザクションが重複したラベルにバインドされている場合、次の結果が返されます。 {
 "Status": "LABEL_ALREADY_EXISTS",
 "ExistingJobStatus": "RUNNING",
 "Message": "Label [streamload_txn_example1_table1] has already been used."
 }
- 
重複ラベル以外のエラーが発生した場合、次の結果が返されます。 {
 "Status": "FAILED",
 "Message": ""
 }
データの書き込み
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
    -H "Expect:100-continue" \
    [-H "transaction_type:multi"]\  # オプション。マルチテーブルトランザクションを介してデータをロードします。
    -H "db:<database_name>" -H "table:<table_name>" \
    -T <file_path> \
    -XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
NOTE
/api/transaction/load操作を呼び出す際、<file_path>を使用してロードしたいデータファイルの保存パスを指定する必要があります。
/api/transaction/load操作を異なるtableパラメータ値で呼び出すことで、同じデータベース内の異なるテーブルにデータをロードできます。この場合、コマンドで-H "transaction_type:multi"を指定する必要があります。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
    -H "Expect:100-continue" \
    -H "db:test_db" -H "table:table1" \
    -T /home/disk1/example1.csv \
    -H "column_separator: ," \
    -XPUT http://<fe_host>:<fe_http_port>/api/transaction/load
NOTE
この例では、データファイル
example1.csvで使用されているカラムセパレータは、StarRocks のデフォルトのカラムセパレータ (\t) ではなく、カンマ (,) です。そのため、/api/transaction/load操作を呼び出す際に、"column_separator: <column_separator>"を使用してカンマ (,) をカラムセパレータとして指定する必要があります。
戻り結果
- 
データ書き込みが成功した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Seq": 0,
 "Label": "streamload_txn_example1_table1",
 "Status": "OK",
 "Message": "",
 "NumberTotalRows": 5265644,
 "NumberLoadedRows": 5265644,
 "NumberFilteredRows": 0,
 "NumberUnselectedRows": 0,
 "LoadBytes": 10737418067,
 "LoadTimeMs": 418778,
 "StreamLoadPutTimeMs": 68,
 "ReceivedDataTimeMs": 38964,
 }
- 
トランザクションが不明と見なされる場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "TXN_NOT_EXISTS"
 }
- 
トランザクションが無効な状態と見なされる場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "Transcation State Invalid"
 }
- 
不明なトランザクションや無効な状態以外のエラーが発生した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": ""
 }
トランザクションの事前コミット
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
    -H "Expect:100-continue" \
    [-H "transaction_type:multi"]\  # オプション。複数テーブルトランザクションを事前コミットします。
    -H "db:<database_name>" \
    [-H "prepared_timeout:<timeout_seconds>"] \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
NOTE
事前コミットしたいトランザクションがマルチテーブルトランザクションの場合は、コマンドで
-H "transaction_type:multi"を指定してください。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
    -H "Expect:100-continue" \
    -H "db:test_db" \
    -H "prepared_timeout:300" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/prepare
NOTE
prepared_timeoutフィールドはオプションです。指定されない場合、デフォルト値は FE 設定prepared_transaction_default_timeout_secondによって決定されます(デフォルト: 86400 秒)。prepared_timeoutは v3.5.4 以降でサポートされています。
戻り結果
- 
事前コミットが成功した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "OK",
 "Message": "",
 "NumberTotalRows": 5265644,
 "NumberLoadedRows": 5265644,
 "NumberFilteredRows": 0,
 "NumberUnselectedRows": 0,
 "LoadBytes": 10737418067,
 "LoadTimeMs": 418778,
 "StreamLoadPutTimeMs": 68,
 "ReceivedDataTimeMs": 38964,
 "WriteDataTimeMs": 417851
 "CommitAndPublishTimeMs": 1393
 }
- 
トランザクションが存在しないと見なされる場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "Transcation Not Exist"
 }
- 
事前コミットがタイムアウトした場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "commit timeout",
 }
- 
存在しないトランザクションや事前コミットのタイムアウト以外のエラーが発生した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "publish timeout"
 }
トランザクションのコミット
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
    -H "Expect:100-continue" \
    [-H "transaction_type:multi"]\  # オプション。複数テーブルのトランザクションをコミットします。
    -H "db:<database_name>" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
NOTE
コミットしたいトランザクションが複数テーブルトランザクションの場合は、コマンドで
-H "transaction_type:multi"を指定してください。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
    -H "Expect:100-continue" \
    -H "db:test_db" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/commit
戻り結果
- 
コミットが成功した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "OK",
 "Message": "",
 "NumberTotalRows": 5265644,
 "NumberLoadedRows": 5265644,
 "NumberFilteredRows": 0,
 "NumberUnselectedRows": 0,
 "LoadBytes": 10737418067,
 "LoadTimeMs": 418778,
 "StreamLoadPutTimeMs": 68,
 "ReceivedDataTimeMs": 38964,
 "WriteDataTimeMs": 417851
 "CommitAndPublishTimeMs": 1393
 }
- 
トランザクションが既にコミットされている場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "OK",
 "Message": "Transaction already commited",
 }
- 
トランザクションが存在しないと見なされる場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "Transcation Not Exist"
 }
- 
コミットがタイムアウトした場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "commit timeout",
 }
- 
データの公開がタイムアウトした場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "publish timeout",
 "CommitAndPublishTimeMs": 1393
 }
- 
存在しないトランザクションやタイムアウト以外のエラーが発生した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": ""
 }
トランザクションのロールバック
構文
curl --location-trusted -u <username>:<password> -H "label:<label_name>" \
    -H "Expect:100-continue" \
    [-H "transaction_type:multi"]\  # Optional. Rolls back a multi-table transaction.
    -H "db:<database_name>" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
NOTE
ロールバックしたいトランザクションが複数テーブルトランザクションの場合は、コマンドで
-H "transaction_type:multi"を指定してください。
例
curl --location-trusted -u <jack>:<123456> -H "label:streamload_txn_example1_table1" \
    -H "Expect:100-continue" \
    -H "db:test_db" \
    -XPOST http://<fe_host>:<fe_http_port>/api/transaction/rollback
戻り結果
- 
ロールバックが成功した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "OK",
 "Message": ""
 }
- 
トランザクションが存在しないと見なされる場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": "Transcation Not Exist"
 }
- 
存在しないトランザクション以外のエラーが発生した場合、次の結果が返されます。 {
 "TxnId": 1,
 "Label": "streamload_txn_example1_table1",
 "Status": "FAILED",
 "Message": ""
 }
参考文献
Stream Load の適用シナリオやサポートされているデータファイル形式、および Stream Load の動作については、Loading from a local file system via Stream Load を参照してください。
Stream Load ジョブの作成に関する構文やパラメータについては、STREAM LOAD を参照してください。