Skip to content

Commit

Permalink
KafkaBetaRelease+AddConnectorNameInMetadata (#39919)
Browse files Browse the repository at this point in the history
* add connector name in the metadata
---------

Co-authored-by: annie-mac <xinlian@microsoft.com>
  • Loading branch information
xinlian12 and annie-mac committed Apr 26, 2024
1 parent 9083b7e commit 35514d9
Show file tree
Hide file tree
Showing 30 changed files with 492 additions and 300 deletions.
9 changes: 2 additions & 7 deletions sdk/cosmos/azure-cosmos-kafka-connect/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
## Release History

### 1.0.0-beta.1 (Unreleased)
### 1.0.0-beta.1 (2024-04-26)

#### Features Added
* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410)
* Added Source connector. See [PR 39410](https://github.com/Azure/azure-sdk-for-java/pull/39410) and [PR 39919](https://github.com/Azure/azure-sdk-for-java/pull/39919)
* Added Sink connector. See [PR 39434](https://github.com/Azure/azure-sdk-for-java/pull/39434)
* Added throughput control support. See [PR 39218](https://github.com/Azure/azure-sdk-for-java/pull/39218)
* Added `ServicePrincipal` support - See [PR 39490](https://github.com/Azure/azure-sdk-for-java/pull/39490)
* Added `ItemPatch support` in sink connector - See [PR 39558](https://github.com/Azure/azure-sdk-for-java/pull/39558)
* Added support to use CosmosDB container for tracking metadata - See [PR 39634](https://github.com/Azure/azure-sdk-for-java/pull/39634)

#### Breaking Changes

#### Bugs Fixed

#### Other Changes
337 changes: 149 additions & 188 deletions sdk/cosmos/azure-cosmos-kafka-connect/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

package com.azure.cosmos.kafka.connect;

import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkConfig;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTask;
import com.azure.cosmos.kafka.connect.implementation.sink.CosmosSinkTaskConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
Expand All @@ -29,13 +31,16 @@
*/
public final class CosmosSinkConnector extends SinkConnector {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosSinkConnector.class);
private static final String CONNECTOR_NAME = "name";

private CosmosSinkConfig sinkConfig;
private String connectorName;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink connector");
this.sinkConfig = new CosmosSinkConfig(props);
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
}

@Override
Expand All @@ -48,7 +53,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
LOGGER.info("Setting task configurations with maxTasks {}", maxTasks);
List<Map<String, String>> configs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
configs.add(this.sinkConfig.originalsStrings());
Map<String, String> taskConfigs = this.sinkConfig.originalsStrings();
taskConfigs.put(CosmosSinkTaskConfig.SINK_TASK_ID,
String.format("%s-%s-%d",
"sink",
this.connectorName,
RandomUtils.nextInt(1, 9999999)));
configs.add(taskConfigs);
}

return configs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.cosmos.CosmosAsyncClient;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
import com.azure.cosmos.implementation.apachecommons.lang.RandomUtils;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
Expand Down Expand Up @@ -55,22 +56,27 @@
*/
public final class CosmosSourceConnector extends SourceConnector implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosSourceConnector.class);
private static final String CONNECTOR_NAME = "name";

private CosmosSourceConfig config;
private CosmosAsyncClient cosmosClient;
private MetadataMonitorThread monitorThread;
private MetadataKafkaStorageManager kafkaOffsetStorageReader;
private IMetadataReader metadataReader;
private String connectorName;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos source connector");
this.config = new CosmosSourceConfig(props);
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config.getAccountConfig());
this.connectorName = props.containsKey(CONNECTOR_NAME) ? props.get(CONNECTOR_NAME).toString() : "EMPTY";
this.cosmosClient = CosmosClientStore.getCosmosClient(this.config.getAccountConfig(), connectorName);

// IMPORTANT: sequence matters
this.kafkaOffsetStorageReader = new MetadataKafkaStorageManager(this.context().offsetStorageReader());
this.metadataReader = this.getMetadataReader();
this.monitorThread = new MetadataMonitorThread(
connectorName,
this.config.getContainersConfig(),
this.config.getMetadataConfig(),
this.context(),
Expand Down Expand Up @@ -199,6 +205,11 @@ private List<Map<String, String>> getFeedRangeTaskConfigs(List<FeedRangeTaskUnit
Map<String, String> taskConfigs = this.config.originalsStrings();
taskConfigs.putAll(
CosmosSourceTaskConfig.getFeedRangeTaskUnitsConfigMap(feedRangeTaskUnits));
taskConfigs.put(CosmosSourceTaskConfig.SOURCE_TASK_ID,
String.format("%s-%s-%d",
"source",
this.connectorName,
RandomUtils.nextInt(1, 9999999)));
feedRangeTaskConfigs.add(taskConfigs);
});

Expand Down Expand Up @@ -239,6 +250,7 @@ private Pair<MetadataTaskUnit, List<FeedRangeTaskUnit>> getAllTaskUnits() {

MetadataTaskUnit metadataTaskUnit =
new MetadataTaskUnit(
this.connectorName,
this.config.getContainersConfig().getDatabaseName(),
allContainers.stream().map(CosmosContainerProperties::getResourceId).collect(Collectors.toList()),
updatedContainerToFeedRangesMap,
Expand All @@ -264,7 +276,7 @@ private Map<FeedRange, KafkaCosmosChangeFeedState> getEffectiveFeedRangesContinu

FeedRangesMetadataTopicOffset feedRangesMetadataTopicOffset =
this.metadataReader
.getFeedRangesMetadataOffset(databaseName, containerProperties.getResourceId())
.getFeedRangesMetadataOffset(databaseName, containerProperties.getResourceId(), this.connectorName)
.block().v;

Map<FeedRange, KafkaCosmosChangeFeedState> effectiveFeedRangesContinuationMap = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class CosmosClientStore {
ACTIVE_DIRECTORY_ENDPOINT_MAP.put(CosmosAzureEnvironment.AZURE_GERMANY, "https://login.microsoftonline.de/");
}

public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig) {
public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfig, String sourceName) {
if (accountConfig == null) {
return null;
}
Expand All @@ -38,7 +38,7 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
new ThrottlingRetryOptions()
.setMaxRetryAttemptsOnThrottledRequests(Integer.MAX_VALUE)
.setMaxRetryWaitTime(Duration.ofSeconds((Integer.MAX_VALUE / 1000) - 1)))
.userAgentSuffix(getUserAgentSuffix(accountConfig));
.userAgentSuffix(getUserAgentSuffix(accountConfig, sourceName));

if (accountConfig.isUseGatewayMode()) {
cosmosClientBuilder.gatewayMode(new GatewayConnectionConfig().setMaxConnectionPoolSize(10000));
Expand All @@ -63,11 +63,16 @@ public static CosmosAsyncClient getCosmosClient(CosmosAccountConfig accountConfi
return cosmosClientBuilder.buildAsyncClient();
}

private static String getUserAgentSuffix(CosmosAccountConfig accountConfig) {
private static String getUserAgentSuffix(CosmosAccountConfig accountConfig, String sourceName) {
String userAgentSuffix = KafkaCosmosConstants.USER_AGENT_SUFFIX;
if (StringUtils.isNotEmpty(sourceName)) {
userAgentSuffix += "|" + sourceName;
}

if (StringUtils.isNotEmpty(accountConfig.getApplicationName())) {
return KafkaCosmosConstants.USER_AGENT_SUFFIX + "|" + accountConfig.getApplicationName();
userAgentSuffix += "|" + accountConfig.getApplicationName();
}

return KafkaCosmosConstants.USER_AGENT_SUFFIX;
return userAgentSuffix;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.kafka.connect.implementation.CosmosClientStore;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import com.azure.cosmos.kafka.connect.implementation.CosmosThroughputControlHelper;
import com.azure.cosmos.kafka.connect.implementation.KafkaCosmosConstants;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
Expand Down Expand Up @@ -36,7 +36,8 @@ public String version() {
public void start(Map<String, String> props) {
LOGGER.info("Starting the kafka cosmos sink task");
this.sinkTaskConfig = new CosmosSinkTaskConfig(props);
this.cosmosClient = CosmosClientStore.getCosmosClient(this.sinkTaskConfig.getAccountConfig());
this.cosmosClient = CosmosClientStore.getCosmosClient(this.sinkTaskConfig.getAccountConfig(), this.sinkTaskConfig.getTaskId());
LOGGER.info("The taskId is " + this.sinkTaskConfig.getTaskId());
this.throughputControlClient = this.getThroughputControlCosmosClient();
this.sinkRecordTransformer = new SinkRecordTransformer(this.sinkTaskConfig);

Expand All @@ -59,7 +60,9 @@ private CosmosAsyncClient getThroughputControlCosmosClient() {
if (this.sinkTaskConfig.getThroughputControlConfig().isThroughputControlEnabled()
&& this.sinkTaskConfig.getThroughputControlConfig().getThroughputControlAccountConfig() != null) {
// throughput control is using a different database account config
return CosmosClientStore.getCosmosClient(this.sinkTaskConfig.getThroughputControlConfig().getThroughputControlAccountConfig());
return CosmosClientStore.getCosmosClient(
this.sinkTaskConfig.getThroughputControlConfig().getThroughputControlAccountConfig(),
this.sinkTaskConfig.getTaskId());
} else {
return this.cosmosClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,36 @@

package com.azure.cosmos.kafka.connect.implementation.sink;

import org.apache.kafka.common.config.ConfigDef;

import java.util.Map;

// Currently the sink task config shares the same config as sink connector config
public class CosmosSinkTaskConfig extends CosmosSinkConfig {
public CosmosSinkTaskConfig(Map<String, ?> parsedConfig) {
super(parsedConfig);
public static final String SINK_TASK_ID = "azure.cosmos.sink.task.id";
private final String taskId;

public CosmosSinkTaskConfig(Map<String, ?> parsedConfigs) {
super(getConfigDef(), parsedConfigs);
this.taskId = this.getString(SINK_TASK_ID);
}

public static ConfigDef getConfigDef() {
ConfigDef configDef = CosmosSinkConfig.getConfigDef();
defineTaskIdConfig(configDef);

return configDef;
}

private static void defineTaskIdConfig(ConfigDef result) {
result
.defineInternal(
SINK_TASK_ID,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.MEDIUM);
}

public String getTaskId() {
return taskId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,30 @@

public class ContainersMetadataTopicPartition {
public static final String DATABASE_NAME_KEY = "database";
public static final String CONNECTOR_NAME_KEY = "connectorName";

private final String databaseName;
private final String connectorName;

public ContainersMetadataTopicPartition(String databaseName) {
public ContainersMetadataTopicPartition(String databaseName, String connectorName) {
checkArgument(StringUtils.isNotEmpty(databaseName), "Argument 'databaseName' can not be null");

this.databaseName = databaseName;
this.connectorName = StringUtils.isEmpty(connectorName) ? "EMPTY" : connectorName;
}

public String getDatabaseName() {
return databaseName;
}

public String getConnectorName() {
return this.connectorName;
}

public static Map<String, Object> toMap(ContainersMetadataTopicPartition topicPartition) {
Map<String, Object> map = new HashMap<>();
map.put(DATABASE_NAME_KEY, topicPartition.getDatabaseName());
map.put(CONNECTOR_NAME_KEY, topicPartition.getConnectorName());
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,17 @@ public void start(Map<String, String> map) {
LOGGER.info("Creating the cosmos client");

// TODO[GA]: optimize the client creation, client metadata cache?
this.cosmosClient = CosmosClientStore.getCosmosClient(this.taskConfig.getAccountConfig());
this.cosmosClient = CosmosClientStore.getCosmosClient(this.taskConfig.getAccountConfig(), this.taskConfig.getTaskId());
this.throughputControlCosmosClient = this.getThroughputControlCosmosClient();
}

private CosmosAsyncClient getThroughputControlCosmosClient() {
if (this.taskConfig.getThroughputControlConfig().isThroughputControlEnabled()
&& this.taskConfig.getThroughputControlConfig().getThroughputControlAccountConfig() != null) {
// throughput control is using a different database account config
return CosmosClientStore.getCosmosClient(this.taskConfig.getThroughputControlConfig().getThroughputControlAccountConfig());
return CosmosClientStore.getCosmosClient(
this.taskConfig.getThroughputControlConfig().getThroughputControlAccountConfig(),
this.taskConfig.getTaskId());
} else {
return this.cosmosClient;
}
Expand Down Expand Up @@ -145,7 +147,7 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {
ContainersMetadataTopicOffset.toMap(containersMetadata.getRight()),
taskUnit.getStorageName(),
Schema.STRING_SCHEMA,
containersMetadata.getLeft().getDatabaseName(),
getContainersMetadataItemId(containersMetadata.getLeft().getDatabaseName(), containersMetadata.getLeft().getConnectorName()),
containersMetadataSchemaAndValue.schema(),
containersMetadataSchemaAndValue.value()));

Expand All @@ -162,7 +164,11 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {
FeedRangesMetadataTopicOffset.toMap(feedRangesMetadata.getRight()),
taskUnit.getStorageName(),
Schema.STRING_SCHEMA,
feedRangesMetadata.getLeft().getDatabaseName() + "_" + feedRangesMetadata.getLeft().getContainerRid(),
this.getFeedRangesMetadataItemId(
feedRangesMetadata.getLeft().getDatabaseName(),
feedRangesMetadata.getLeft().getContainerRid(),
feedRangesMetadata.getLeft().getConnectorName()
),
feedRangeMetadataSchemaAndValue.schema(),
feedRangeMetadataSchemaAndValue.value()));
}
Expand All @@ -171,6 +177,13 @@ private List<SourceRecord> executeMetadataTask(MetadataTaskUnit taskUnit) {
return sourceRecords;
}

private String getContainersMetadataItemId(String databaseName, String connectorName) {
return databaseName + "_" + connectorName;
}
private String getFeedRangesMetadataItemId(String databaseName, String collectionRid, String connectorName) {
return databaseName + "_" + collectionRid + "_" + connectorName;
}

private Pair<List<SourceRecord>, Boolean> executeFeedRangeTask(FeedRangeTaskUnit feedRangeTaskUnit) {
CosmosAsyncContainer container =
this.cosmosClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ public class CosmosSourceTaskConfig extends CosmosSourceConfig {

public static final String SOURCE_METADATA_TASK_UNIT = "azure.cosmos.source.task.metadataTaskUnit";
public static final String SOURCE_FEED_RANGE_TASK_UNITS = "azure.cosmos.source.task.feedRangeTaskUnits";
public static final String SOURCE_TASK_ID = "azure.cosmos.source.task.id";

private final List<FeedRangeTaskUnit> feedRangeTaskUnits;
private MetadataTaskUnit metadataTaskUnit;
private final MetadataTaskUnit metadataTaskUnit;
private final String taskId;

public CosmosSourceTaskConfig(Map<String, String> parsedConfigs) {
super(getConfigDef(), parsedConfigs);

this.feedRangeTaskUnits = this.parseFeedRangeTaskUnits();
this.metadataTaskUnit = this.parseMetadataTaskUnit();
this.taskId = this.getString(SOURCE_TASK_ID);
}

public static ConfigDef getConfigDef() {
Expand All @@ -52,6 +55,12 @@ private static void defineTaskUnitsConfig(ConfigDef result) {
ConfigDef.Type.STRING,
null,
ConfigDef.Importance.HIGH
)
.defineInternal(
SOURCE_TASK_ID,
ConfigDef.Type.STRING,
ConfigDef.NO_DEFAULT_VALUE,
ConfigDef.Importance.MEDIUM
);
}

Expand Down Expand Up @@ -135,4 +144,8 @@ public List<FeedRangeTaskUnit> getFeedRangeTaskUnits() {
public MetadataTaskUnit getMetadataTaskUnit() {
return metadataTaskUnit;
}

public String getTaskId() {
return taskId;
}
}
Loading

0 comments on commit 35514d9

Please sign in to comment.