Apache® Pulsar™ からのデータを継続的にロードする
StarRocks バージョン 2.5 から、Routine Load は Apache® Pulsar™ からのデータを継続的にロードすることをサポートしています。Pulsar は、ストアとコンピュートの分離アーキテクチャを持つ、分散型のオープンソースの pub-sub メッセージングおよびストリーミングプラットフォームです。Routine Load を介して Pulsar からデータをロードすることは、Apache Kafka からデータをロードすることに似ています。このトピックでは、CSV 形式のデータを例に、Routine Load を介して Apache Pulsar からデータをロードする方法を紹介します。
サポートされているデータファイル形式
Routine Load は、Pulsar クラスターから CSV および JSON 形式のデータを消費することをサポートしています。
NOTE
CSV 形式のデータについては、StarRocks は 50 バイト以内の UTF-8 エンコードされた文字列をカラムセパレータとしてサポートしています。一般的に使用されるカラムセパレータには、カンマ (,) 、タブ、パイプ (|) があります。
Pulsar に関連する概念
Pulsar のトピックは、プロデューサーからコンシューマーにメッセージを送信するための名前付きチャネルです。Pulsar のトピックは、パーティション付きトピックと非パーティション付きトピックに分かれています。
- パーティション付きトピック は、複数のブローカーによって処理される特別なタイプのトピックであり、より高いスループットを可能にします。パーティション付きトピックは実際には N 個の内部トピックとして実装されており、N はパーティションの数です。
- 非パーティション付きトピック は、単一のブローカーによってのみ提供される通常のタイプのトピックであり、トピックの最大スループットを制限します。
メッセージのメッセージ ID は、メッセージが永続的に保存されるとすぐに BookKeeper インスタンス によって割り当てられます。メッセージ ID は、台帳内のメッセージの特定の位置を示し、Pulsar クラスター内で一意です。
Pulsar は、コンシューマーが consumer.seek(messageId) を通じて初期位置を指定することをサポートしています。しかし、Kafka コンシューマーオフセットが長整数値であるのに対し、メッセージ ID は ledgerId:entryID:partition-index:batch-index の4つの部分で構成されています。
したがって、メッセージから直接メッセージ ID を取得することはできません。その結果、現在、Routine Load は Pulsar からデータをロードする際に初期位置を指定することをサポートしておらず、パーティションの開始または終了からデータを消費することのみをサポートしています。
サブスクリプションは、メッセージがコンシューマーにどのように配信されるかを決定する名前付きの設定ルールです。Pulsar は、コンシューマーが複数のトピックに同時にサブスクライブすることもサポートしています。トピックには複数のサブスクリプションを持つことができます。
サブスクリプションのタイプは、コンシューマーが接続する際に定義され、すべてのコンシューマーを異なる設定で再起動することで変更できます。Pulsar には4つのサブスクリプションタイプがあります:
- exclusive(デフォルト): 単一のコンシューマーのみがサブスクリプションに接続できます。1人の顧客のみがメッセージを消費できます。
- shared: 複数のコンシューマーが同じサブスクリプションに接続できます。メッセージはコンシューマー間でラウンドロビン配信され、特定のメッセージは1つのコンシューマーにのみ配信されます。
- failover: 複数のコンシューマーが同じサブスクリプションに接続できます。非パーティション付きトピックまたはパーティション付きトピックの各パーティションに対してマスターコンシューマーが選ばれ、メッセージを受信します。マスターコンシューマーが切断されると、すべての(未確認および後続の)メッセージが次のコンシューマーに配信されます。
- key_shared: 複数のコンシューマーが同じサブスクリプションに接続できます。メッセージはコンシューマー間で配信され、同じキーまたは同じ順序キーを持つメッセージは1つのコンシューマーにのみ配信されます。
Note:
現在、Routine Load は exclusive タイプを使用しています。
Routine Load ジョブを作成する
以下の例は、Pulsar で CSV 形式のメッセージを消費し、Routine Load ジョブを作成して StarRocks にデータをロードする方法を説明します。詳細な手順と参考情報については、CREATE ROUTINE LOAD を参照してください。
CREATE ROUTINE LOAD load_test.routine_wiki_edit_1 ON routine_wiki_edit
COLUMNS TERMINATED BY ",",
ROWS TERMINATED BY "\n",
COLUMNS (order_id, pay_dt, customer_name, nationality, temp_gender, price)
WHERE event_time > "2022-01-01 00:00:00",
PROPERTIES
(
    "desired_concurrent_number" = "1",
    "max_batch_interval" = "15000",
    "max_error_number" = "1000"
)
FROM PULSAR
(
    "pulsar_service_url" = "pulsar://localhost:6650",
    "pulsar_topic" = "persistent://tenant/namespace/topic-name",
    "pulsar_subscription" = "load-test",
    "pulsar_partitions" = "load-partition-0,load-partition-1",
    "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_LATEST",
    "property.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD5Y"
);
Routine Load が Pulsar からデータを消費するために作成されると、data_source_properties を除くほとんどの入力パラメータは Kafka からデータを消費する場合と同じです。data_source_properties を除くパラメータの説明については、CREATE ROUTINE LOAD を参照してください。
data_source_properties に関連するパラメータとその説明は次のとおりです:
| Parameter | Required | Description | 
|---|---|---|
| pulsar_service_url | Yes | Pulsar クラスターに接続するために使用される URL。形式: "pulsar://ip:port"または"pulsar://service:port"。例:"pulsar_service_url" = "pulsar://``localhost:6650``" | 
| pulsar_topic | Yes | サブスクライブされたトピック。例: "pulsar_topic" = "persistent://tenant/namespace/topic-name" | 
| pulsar_subscription | Yes | トピックに設定されたサブスクリプション。例: "pulsar_subscription" = "my_subscription" | 
| pulsar_partitions, pulsar_initial_positions | No | pulsar_partitions: トピック内のサブスクライブされたパーティション。pulsar_initial_positions:pulsar_partitionsで指定されたパーティションの初期位置。初期位置はpulsar_partitionsのパーティションに対応している必要があります。有効な値:POSITION_EARLIEST(デフォルト値): サブスクリプションはパーティション内の最も早く利用可能なメッセージから開始します。POSITION_LATEST: サブスクリプションはパーティション内の最新の利用可能なメッセージから開始します。注意:pulsar_partitionsが指定されていない場合、トピックのすべてのパーティションがサブスクライブされます。pulsar_partitionsとproperty.pulsar_default_initial_positionの両方が指定されている場合、pulsar_partitionsの値がproperty.pulsar_default_initial_positionの値を上書きします。pulsar_partitionsもproperty.pulsar_default_initial_positionも指定されていない場合、サブスクリプションはパーティション内の最新の利用可能なメッセージから開始します。例:"pulsar_partitions" = "my-partition-0,my-partition-1,my-partition-2,my-partition-3", "pulsar_initial_positions" = "POSITION_EARLIEST,POSITION_EARLIEST,POSITION_LATEST,POSITION_LATEST" | 
Routine Load は、Pulsar 用の以下のカスタムパラメータをサポートしています。
| Parameter | Required | Description | 
|---|---|---|
| property.pulsar_default_initial_position | No | トピックのパーティションがサブスクライブされたときのデフォルトの初期位置。このパラメータは pulsar_initial_positionsが指定されていない場合に有効です。その有効な値はpulsar_initial_positionsの有効な値と同じです。例:"``property.pulsar_default_initial_position" = "POSITION_EARLIEST" | 
| property.auth.token | No | Pulsar がセキュリティトークンを使用してクライアントを認証する場合、あなたの身元を確認するためにトークン文字列が必要です。例: "p``roperty.auth.token" = "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD" | 
ロードジョブとタスクを確認する
ロードジョブを確認する
SHOW ROUTINE LOAD ステートメントを実行して、ロードジョブ routine_wiki_edit_1 のステータスを確認します。StarRocks は、実行状態 State、統計情報(消費された総行数とロードされた総行数を含む)Statistics、およびロードジョブの進行状況 progress を返します。
Pulsar からデータを消費する Routine Load ジョブを確認する際、progress を除くほとんどの返されるパラメータは Kafka からデータを消費する場合と同じです。progress はバックログ、つまりパーティション内の未確認メッセージの数を指します。
MySQL [load_test] > SHOW ROUTINE LOAD for routine_wiki_edit_1 \G
*************************** 1. row ***************************
                  Id: 10142
                Name: routine_wiki_edit_1
          CreateTime: 2022-06-29 14:52:55
           PauseTime: 2022-06-29 17:33:53
             EndTime: NULL
              DbName: default_cluster:test_pulsar
           TableName: test1
               State: PAUSED
      DataSourceType: PULSAR
      CurrentTaskNum: 0
       JobProperties: {"partitions":"*","rowDelimiter":"'\n'","partial_update":"false","columnToColumnExpr":"*","maxBatchIntervalS":"10","whereExpr":"*","timezone":"Asia/Shanghai","format":"csv","columnSeparator":"','","json_root":"","strict_mode":"false","jsonpaths":"","desireTaskConcurrentNum":"3","maxErrorNum":"10","strip_outer_array":"false","currentTaskConcurrentNum":"0","maxBatchRows":"200000"}
DataSourceProperties: {"serviceUrl":"pulsar://localhost:6650","currentPulsarPartitions":"my-partition-0,my-partition-1","topic":"persistent://tenant/namespace/topic-name","subscription":"load-test"}
    CustomProperties: {"auth.token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUJzdWIiOiJqaXV0aWFuY2hlbiJ9.lulGngOC72vE70OW54zcbyw7XdKSOxET94WT_hIqD"}
           Statistic: {"receivedBytes":5480943882,"errorRows":0,"committedTaskNum":696,"loadedRows":66243440,"loadRowsRate":29000,"abortedTaskNum":0,"totalRows":66243440,"unselectedRows":0,"receivedBytesRate":2400000,"taskExecuteTimeMs":2283166}
            Progress: {"my-partition-0(backlog): 100","my-partition-1(backlog): 0"}
ReasonOfStateChanged: 
        ErrorLogUrls: 
            OtherMsg:
1 row in set (0.00 sec)
ロードタスクを確認する
SHOW ROUTINE LOAD TASK ステートメントを実行して、ロードジョブ routine_wiki_edit_1 のロードタスクを確認します。例えば、いくつのタスクが実行中か、消費されている Kafka トピックのパーティションと消費の進行状況 DataSourceProperties、および対応する Coordinator BE ノード BeId などです。
MySQL [example_db]> SHOW ROUTINE LOAD TASK WHERE JobName = "routine_wiki_edit_1" \G
ロードジョブを変更する
ロードジョブを変更する前に、PAUSE ROUTINE LOAD ステートメントを使用して一時停止する必要があります。その後、ALTER ROUTINE LOAD を実行できます。変更後、RESUME ROUTINE LOAD ステートメントを実行して再開し、SHOW ROUTINE LOAD ステートメントを使用してそのステータスを確認できます。
Routine Load が Pulsar からデータを消費するために使用される場合、data_source_properties を除くほとんどの返されるパラメータは Kafka からデータを消費する場合と同じです。
次の点に注意してください:
- data_source_propertiesに関連するパラメータの中で、現在サポートされているのは- pulsar_partitions、- pulsar_initial_positions、およびカスタム Pulsar パラメータ- property.pulsar_default_initial_positionと- property.auth.tokenのみです。- pulsar_service_url、- pulsar_topic、および- pulsar_subscriptionは変更できません。
- 消費するパーティションと一致する初期位置を変更する必要がある場合は、Routine Load ジョブを作成する際に pulsar_partitionsを使用してパーティションを指定し、指定されたパーティションの初期位置pulsar_initial_positionsのみを変更できることを確認する必要があります。
- Routine Load ジョブを作成する際にトピック pulsar_topicのみを指定し、パーティションpulsar_partitionsを指定しない場合、トピックのすべてのパーティションの開始位置をpulsar_default_initial_positionを介して変更できます。