众多的应用实现全球化部署的时候,希望实现本地应用对本地数据进行访问,那么数据库就需要全球同步,并且多主机写入。传统模型里面实现这点是比较困难的。在同步和性能方面都会受到各种局限,如网络、存储、数据的一致性问题。当cosmos db诞生后,这些问题都可以得到解决。假设一个信息发布,有全球数以百万、千万的用户,数以几十亿的文章。那么这时候这套系统肯定最外面的结局就是全球分布式部署,并且就近访问。下面以Demo方式实现此构架。模型如下。
假设在东南亚、北欧、美西 创建3个站点,承接全球的业务。东南亚的用户写入和读取都在东南亚的数据库进行写入和读取。北欧的同样在北欧读取和写入,美西也在美西进行读取和写入。
Cosmos DB设计
分别在东南亚、北欧、美西创建一个cosmos DB设置为如下配置
创建模型
本文模拟文章发布页面,先创建文章的模型,应用程序如何建立,请参考:https://docs.microsoft.com/zh-cn/azure/cosmos-db/tutorial-develop-sql-api-dotnet
publicclassArticle
{
// Unique ID for Article
[JsonProperty(PropertyName="id")]
publicstringId{get;set; }
publicstringPartitionKey
{
get
{
returnthis.Id;
}
}
// Author of the article
publicstringAuthor{get;set; }
// Category/genre of the article
publicstringCategory{get;set; }
// Tags associated with the article
publicstringTitle{get;set; }
}
controller,这里只模拟了展现和创建,删除写了方法。没有使用
publicclassArticleController:Controller
{
// GET: Article
[ActionName("Index")]
publicasyncTaskIndexAsync()
{
varitems=awaitDocumentDBRepository.GetListAsync();
returnView(items);
}
[ActionName("Create")]
publicasyncTaskCreateAsync()
{
returnView();
}
#pragmawarningrestore1998
[HttpPost]
[ActionName("Create")]
[ValidateAntiForgeryToken]
publicasyncTaskCreateAsync([Bind(Include="Id,PartitionKey,Category ,Title, Author ")]Articleitem)
{
if(ModelState.IsValid)
{
awaitDocumentDBRepository.CreateAsync(item);
returnRedirectToAction("Index");
}
returnView(item);
}
[HttpPost]
[ActionName("Delete")]
[ValidateAntiForgeryToken]
publicasyncTaskDeleteConfirmedAsync([Bind(Include="Id")]stringid)
{
awaitDocumentDBRepository.DeleteAsync(id);
returnRedirectToAction("Index");
}
}
数据访问层
创建一个类DocumentDBRepository.cs
publicstaticclassDocumentDBRepositorywhereT:class
{
privatestaticreadonlystringDatabaseId=ConfigurationManager.AppSettings["database"];
//private static readonly string CollectionIdArticle = "Article";
// private static DocumentClient client;
privatestaticDocumentClientwriteClient;
privatestaticDocumentClientreadClient1;
privatestaticDocumentClientreadClient2;
publicstaticvoidInitialize()
{
// client = new DocumentClient(new Uri(ConfigurationManager.AppSettings["endpoint"]), ConfigurationManager.AppSettings["authKey"]);
ConnectionPolicyClientPolicyAsia=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};
ClientPolicyAsia.PreferredLocations.Add(LocationNames.SoutheastAsia);
ClientPolicyAsia.PreferredLocations.Add(LocationNames.WestUS);
ClientPolicyAsia.PreferredLocations.Add(LocationNames.NorthEurope);
ConnectionPolicyClientPolicyNorthEurope=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};
ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.NorthEurope);
ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.WestUS);
ClientPolicyNorthEurope.PreferredLocations.Add(LocationNames.SoutheastAsia);
ConnectionPolicyClientPolicyUS=newConnectionPolicy{ConnectionMode=ConnectionMode.Direct,ConnectionProtocol=Protocol.Tcp};
ClientPolicyUS.PreferredLocations.Add(LocationNames.WestUS);
ClientPolicyUS.PreferredLocations.Add(LocationNames.SoutheastAsia);
ClientPolicyUS.PreferredLocations.Add(LocationNames.NorthEurope);
readClient2=newDocumentClient(
newUri("https://contentdatabase-usa.documents.azure.com:443/"),
"key",
ClientPolicyUS);
writeClient=newDocumentClient(
newUri("https://contentdatabase-asia.documents.azure.com:443"),
"key",
ClientPolicyAsia);
readClient1=newDocumentClient(
newUri("https://contentdatabase-europe.documents.azure.com:443/"),
"key",
ClientPolicyNorthEurope);
CreateDatabaseIfNotExistsAsync().Wait();
CreateCollectionIfNotExistsAsync("Article").Wait();
// CreateCollectionIfNotExistsAsync("Review").Wait();
}
privatestaticasyncTaskCreateDatabaseIfNotExistsAsync()
{
try
{
awaitwriteClient.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
awaitwriteClient.CreateDatabaseAsync(newDatabase{Id=DatabaseId});
}
else
{
throw;
}
}
try
{
awaitreadClient1.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
awaitreadClient1.CreateDatabaseAsync(newDatabase{Id=DatabaseId});
}
else
{
throw;
}
}
try
{
awaitreadClient2.ReadDatabaseAsync(UriFactory.CreateDatabaseUri(DatabaseId));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
awaitreadClient2.CreateDatabaseAsync(newDatabase{Id=DatabaseId});
}
else
{
throw;
}
}
}
privatestaticasyncTaskCreateCollectionIfNotExistsAsync(stringcollectionname)
{
//创建Article
try
{
awaitwriteClient.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
DocumentCollectionmyCollection=newDocumentCollection();
myCollection.Id=collectionname;
myCollection.PartitionKey.Paths.Add("/id");
awaitwriteClient.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
myCollection,
newRequestOptions{OfferThroughput=1000});
}
else
{
throw;
}
}
try
{
awaitreadClient1.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
DocumentCollectionmyCollection=newDocumentCollection();
myCollection.Id=collectionname;
myCollection.PartitionKey.Paths.Add("/id");
awaitreadClient1.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
myCollection,
newRequestOptions{OfferThroughput=1000});
}
else
{
throw;
}
}
try
{
awaitreadClient2.ReadDocumentCollectionAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,collectionname));
}
catch(DocumentClientExceptione)
{
if(e.StatusCode==System.Net.HttpStatusCode.NotFound)
{
DocumentCollectionmyCollection=newDocumentCollection();
myCollection.Id=collectionname;
myCollection.PartitionKey.Paths.Add("/id");
awaitreadClient2.CreateDocumentCollectionAsync(
UriFactory.CreateDatabaseUri(DatabaseId),
myCollection,
newRequestOptions{OfferThroughput=1000});
}
else
{
throw;
}
}
}
publicstaticasyncTaskGetListAsync()
{
//合并三个区域的文章
//typeof(T).ToString();
// T.
IDocumentQuerywritequery=writeClient.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
IDocumentQueryreadquery1=readClient1.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
IDocumentQueryreadquery2=readClient2.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
ListresultsList=newList();
while(writequery.HasMoreResultsreadquery1.HasMoreResultsreadquery2.HasMoreResults)
{
IListresults=newList();
if(writequery.HasMoreResults)
{
results.Add(writequery.ExecuteNextAsync());
}
if(readquery1.HasMoreResults)
{
results.Add(readquery1.ExecuteNextAsync());
}
if(readquery2.HasMoreResults)
{
results.Add(readquery2.ExecuteNextAsync());
}
IListFeedResult=awaitTask.WhenAll(results);
foreach(FeedResponsefeedinFeedResult)
{
resultsList.AddRange(feed);
}
}
returnresultsList;
}
publicstaticasyncTaskCreateAsync(Titem)
{
returnawaitwriteClient.CreateDocumentAsync(UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]),item);
}
publicstaticasyncTaskDeleteAsync(stringid)
{
awaitwriteClient.DeleteDocumentAsync(UriFactory.CreateDocumentUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1],id));
}
}
可以看到上面的代码中,初始化了3个DocumentClient,分布是指向3个区域的cosmos db
创建站点
在对应区域创建相应的站点美西站点美西的初始化的DocumentClient 为
writeClient=newDocumentClient(
newUri("https://contentdatabase-usa.documents.azure.com:443/"),
"key",
ClientPolicyUS);
readClient1=newDocumentClient(
newUri("https://contentdatabase-asia.documents.azure.com:443"),
"key",
ClientPolicyAsia);
readClient2=newDocumentClient(
newUri("https://contentdatabase-europe.documents.azure.com:443/"),
"key",
ClientPolicyNorthEurope);
北欧站点北欧初始化的DocumentClient 为
readClient2=newDocumentClient(
newUri("https://contentdatabase-usa.documents.azure.com:443/"),
"key",
ClientPolicyUS);
readClient1=newDocumentClient(
newUri("https://contentdatabase-asia.documents.azure.com:443"),
"key",
ClientPolicyAsia);
writeClient=newDocumentClient(
newUri("https://contentdatabase-europe.documents.azure.com:443/"),
"key",
ClientPolicyNorthEurope);
东亚站点:东亚初始化的DocumentClient 为
readClient1=newDocumentClient(
newUri("https://contentdatabase-usa.documents.azure.com:443/"),
"key",
ClientPolicyUS);
writeClient=newDocumentClient(
newUri("https://contentdatabase-asia.documents.azure.com:443"),
"key",
ClientPolicyAsia);
readClient2=newDocumentClient(
newUri("https://contentdatabase-europe.documents.azure.com:443/"),
"key",
ClientPolicyNorthEurope);
数据合并
由于是三个cosmos db因此在数据列表中,需要对数据列表进行合并,合并代码是在数据访问层中的代码片段:
publicstaticasyncTaskGetListAsync()
{
//合并三个区域的文章
//typeof(T).ToString();
// T.
IDocumentQuerywritequery=writeClient.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
IDocumentQueryreadquery1=readClient1.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
IDocumentQueryreadquery2=readClient2.CreateDocumentQuery(
UriFactory.CreateDocumentCollectionUri(DatabaseId,typeof(T).ToString().Split('.')[typeof(T).ToString().Split('.').Length-1]))
.AsDocumentQuery();
ListresultsList=newList();
while(writequery.HasMoreResultsreadquery1.HasMoreResultsreadquery2.HasMoreResults)
{
IListresults=newList();
if(writequery.HasMoreResults)
{
results.Add(writequery.ExecuteNextAsync());
}
if(readquery1.HasMoreResults)
{
results.Add(readquery1.ExecuteNextAsync());
}
if(readquery2.HasMoreResults)
{
results.Add(readquery2.ExecuteNextAsync());
}
IListFeedResult=awaitTask.WhenAll(results);
foreach(FeedResponsefeedinFeedResult)
{
resultsList.AddRange(feed);
}
}
returnresultsList;
}
实现效果
访问东南亚站点看到数据为一共有4条数据
在cosmos db中的数据为3条,还有一条在美西的cosmos db中
我们访问欧洲站点,看到结果一样,欧洲的cosmos db中并无数据
当一个站点写入一条数据,其余站点会在毫秒级既可以展示。这样通过选择合适的分区键和静态的基于帐户的分区,可以使用 Azure Cosmos DB 实现多区域本地写入和读取。也即是,在不同区域建立访问的web站点,web站点读取本地的数据和写入到本地。其他区域也是如此,然后建立cosmos db的帐户进行静态的分区,通过cosmos db实现毫秒级的分发,应用层进行数据的合并。这样实现多主机读写、跨区域、全球分布的应用模型。
领取专属 10元无门槛券
私享最新 技术干货