欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 健康 > 美食 > windows C#-生成和使用异步流(下)

windows C#-生成和使用异步流(下)

2024/11/29 19:37:02 来源:https://blog.csdn.net/m0_72813396/article/details/144001781  浏览:    关键词:windows C#-生成和使用异步流(下)
异步流可提供更好的方法

异步流和关联语言支持解决了所有这些问题。 生成序列的代码现在可以使用 yield return 返回用 async 修饰符声明的方法中的元素。 可以通过 await foreach 循环来使用异步流,就像通过 foreach 循环使用任何序列一样。

这些新语言功能依赖于添加到 .NET Standard 2.1 并在 .NET Core 3.0 中实现的三个新接口:

System.Collections.Generic.IAsyncEnumerable<T>
System.Collections.Generic.IAsyncEnumerator<T>
System.IAsyncDisposable

大多数 C# 开发人员都应该熟悉这三个接口。 它们的行为方式类似于其对应的同步对象:

System.Collections.Generic.IEnumerable<T>
System.Collections.Generic.IEnumerator<T>
System.IDisposable

可能不熟悉的一种类型是 System.Threading.Tasks.ValueTask。 ValueTask 结构提供了与 System.Threading.Tasks.Task 类类似的 API。 出于性能方面的原因,这些接口中使用了 ValueTask。

转换为异步流

接下来,转换 RunPagedQueryAsync 方法以生成异步流。 首先,更改 RunPagedQueryAsync 的签名以返回 IAsyncEnumerable<JToken>,并从参数列表删除取消令牌和进度对象,如以下代码所示:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,string queryText, string repoName)

起始代码在检索页面时处理每个页面,如以下代码所示:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

将这三行替换为以下代码:

foreach (JObject issue in issues(results)["nodes"]!)yield return issue;

还可以在此方法中删除前面的 finalResults 声明以及你修改的循环之后的 return 语句。

已完成更改以生成异步流。 已完成的方法应与以下代码类似:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,string queryText, string repoName)
{var issueAndPRQuery = new GraphQLRequest{Query = queryText};issueAndPRQuery.Variables["repo_name"] = repoName;bool hasMorePages = true;int pagesReturned = 0;int issuesReturned = 0;// Stop with 10 pages, because these are large repos:while (hasMorePages && (pagesReturned++ < 10)){var postBody = issueAndPRQuery.ToJsonText();var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),postBody, "application/json", "application/json");JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);int totalCount = (int)issues(results)["totalCount"]!;hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();issuesReturned += issues(results)["nodes"]!.Count();foreach (JObject issue in issues(results)["nodes"]!)yield return issue;}JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

接下来,将使用集合的代码更改为使用异步流。 在 Main 中找到以下处理问题集合的代码:

var progressReporter = new progressStatus((num) =>
{Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();try
{var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",cancellationSource.Token, progressReporter);foreach(var issue in results)Console.WriteLine(issue);
}
catch (OperationCanceledException)
{Console.WriteLine("Work has been cancelled");
}

将该代码替换为以下 await foreach 循环: 

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{Console.WriteLine(issue);Console.WriteLine($"Received {++num} issues in total");
}

新接口 IAsyncEnumerator<T> 派生自 IAsyncDisposable。 这意味着在循环完成时,前面的循环会以异步方式释放流。 可以假设循环类似于以下代码: 

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{while (await enumerator.MoveNextAsync()){var issue = enumerator.Current;Console.WriteLine(issue);Console.WriteLine($"Received {++num} issues in total");}
} finally
{if (enumerator != null)await enumerator.DisposeAsync();
}

默认情况下,在捕获的上下文中处理流元素。 如果要禁用上下文捕获,请使用 TaskAsyncEnumerableExtensions.ConfigureAwait 扩展方法。 

异步流支持使用与其他 async 方法相同的协议的取消。 要支持取消,请按如下所示修改异步迭代器方法的签名:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{var issueAndPRQuery = new GraphQLRequest{Query = queryText};issueAndPRQuery.Variables["repo_name"] = repoName;bool hasMorePages = true;int pagesReturned = 0;int issuesReturned = 0;// Stop with 10 pages, because these are large repos:while (hasMorePages && (pagesReturned++ < 10)){var postBody = issueAndPRQuery.ToJsonText();var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),postBody, "application/json", "application/json");JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);int totalCount = (int)issues(results)["totalCount"]!;hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();issuesReturned += issues(results)["nodes"]!.Count();foreach (JObject issue in issues(results)["nodes"]!)yield return issue;}JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

 System.Runtime.CompilerServices.EnumeratorCancellationAttribute 属性导致编译器生成 IAsyncEnumerator<T> 的代码,该代码使传递给 GetAsyncEnumerator 的令牌对作为该参数的异步迭代器的主体可见。 在 runQueryAsync 中,可以检查令牌的状态,并在请求时取消进一步的工作。

使用另一个扩展方法 WithCancellation,将取消标记传递给异步流。 可以按如下所示修改枚举问题的循环:

private static async Task EnumerateWithCancellation(GitHubClient client)
{int num = 0;var cancellation = new CancellationTokenSource();await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs").WithCancellation(cancellation.Token)){Console.WriteLine(issue);Console.WriteLine($"Received {++num} issues in total");}
}

可以从 asynchronous-programming/snippets 文件夹中的 dotnet/docs 存储库中获取已完成的文的代码。

运行完成的应用程序

再次运行该应用程序。 将其行为与初学者应用程序的行为进行对比。 会在结果的第一页可用立即对其进行枚举。 在请求和检索每个新页面时都会有一个可观察到的暂停,然后快速枚举下一页结果。 不需要 try / catch 块来处理取消:调用者可以停止枚举集合。 由于异步流在下载每个页面时生成结果,因此可以清楚地报告进度。 返回的每个问题的状态都无缝包含在 await foreach 循环中。 不需要回调对象即可跟踪进度。

通过检查代码,可以看到内存使用方面的改进。 不再需要在枚举所有结果之前分配一个集合来存储它们。 调用者可以决定如何使用结果,以及是否需要存储集合。

运行初学者应用程序和已完成的应用程序,可以自行观察实现之间的差异。 可以在完成本文后删除在开始学习本文时创建的 GitHub 访问令牌。 如果攻击者获得了对该令牌的访问权限,他们可以使用你的凭据来访问 GitHub API。

在本文中,你使用异步流从返回数据页的网络 API 读取单个项。 异步流还可以从股票行情自动收录器或传感器设备等“永不结束的流”读取内容。 对 MoveNextAsync 的调用将在下一项可用后立即返回它。

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com