欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 财经 > 创投人物 > 60、Flink 的异步 IO 算子使用异步 Http 客户端查高德地图

60、Flink 的异步 IO 算子使用异步 Http 客户端查高德地图

2024/10/23 23:33:56 来源:https://blog.csdn.net/m0_50186249/article/details/140034804  浏览:    关键词:60、Flink 的异步 IO 算子使用异步 Http 客户端查高德地图

1、概述

  Http 异步客户端设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态输入:同时发起4条查询输出:间隔10秒,同时返回4条数据JDBC 线程池+链接池设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态输入:同时发起4条查询输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据

2、代码示例

package com.xu.flink.datastream.day11;import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.util.concurrent.Executors;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.util.EntityUtils;import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.concurrent.TimeUnit;public class _06_HttpAsyncQueryGaoDe {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);AsyncDataStream.unorderedWait(lines, new AsyncHttpQueryFunction(), 20, TimeUnit.SECONDS, 2).print();env.execute();}
}/*** Http 异步客户端* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态* 输入:同时发起4条查询* 输出:间隔10秒,同时返回4条数据* <p>* JDBC 线程池+链接池* 设置:并行度=2,capacity=2,HttpMaxConn=2,client 为静态* 输入:同时发起4条查询* 输出:间隔10秒,先返回两条数据,间隔10秒,再返回两条数据*/
class AsyncHttpQueryFunction extends RichAsyncFunction<String, _06_OrderBean> {private static final String key = "***";private static CloseableHttpAsyncClient httpClient;@Overridepublic void open(Configuration parameters) throws Exception {//创建异步 HttpClient 连接池,并初始化异步的 HttpClientRequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(3000).setConnectTimeout(3000).build();httpClient = HttpAsyncClients.custom().setMaxConnTotal(2).setDefaultRequestConfig(requestConfig).build();System.out.println("open 方法被调用");System.out.println("httpClient=>" + httpClient.hashCode());httpClient.start();}@Overridepublic void asyncInvoke(String line, ResultFuture<_06_OrderBean> resultFuture) throws Exception {try {// 1、解析JSON,获取经纬度信息_06_OrderBean orderBean = JSON.parseObject(line, _06_OrderBean.class);double longitude = orderBean.getLongitude();double latitude = orderBean.getLatitude();// 创建 httpGet 请求HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location=" + longitude + "," + latitude + "&key=" + key);// 2、通过 httpClient 提交异步请求,获取 future 对象// callback 是回调函数(也可通过回调函数拿结果)// 注意:此处使用 task 线程,如果此处是异步提交,则不会阻塞;如果此处是同步提交,则会阻塞;Future<HttpResponse> future = httpClient.execute(httpGet, null);// 3、从成功的 Future 中取数据,返回 orderBean// 使用 Executors.directExecutor() 获取返回结果// 注意:此处为异步获取返回结果,会使用单独的线程池,即使用 get() 方法,也不会阻塞 task 线程CompletableFuture.supplyAsync(new Supplier<_06_OrderBean>() {@Overridepublic _06_OrderBean get() {try {HttpResponse response = future.get();String province = null;String city = null;String district = null;if (response.getStatusLine().getStatusCode() == 200) {//拿出响应的实例对象HttpEntity entity = response.getEntity();JSONObject jsonObject = JSON.parseObject(EntityUtils.toString(entity));JSONObject regeocodeObject = jsonObject.getJSONObject("regeocode");if (regeocodeObject != null && !regeocodeObject.isEmpty()) {JSONObject addObject = regeocodeObject.getJSONObject("addressComponent");district = addObject.getString("district");city = addObject.getString("city");province = addObject.getString("province");}}orderBean.setProvince(province);orderBean.setCity(city);orderBean.setDistrict(district);return orderBean;} catch (Exception e) {return null;}}}, Executors.directExecutor()).thenAccept(new Consumer<_06_OrderBean>() {@Overridepublic void accept(_06_OrderBean resultOrderBean) {resultFuture.complete(Collections.singleton(resultOrderBean));}});} catch (Exception e) {resultFuture.complete(Collections.singleton(null));}}@Overridepublic void timeout(String input, ResultFuture<_06_OrderBean> resultFuture) throws Exception {resultFuture.completeExceptionally(new RuntimeException(input + "=查询超时"));}@Overridepublic void close() throws Exception {httpClient.close();}
}
-----------------------------------------------------------------
@Data
@NoArgsConstructor
@AllArgsConstructor
public class _06_OrderBean {private String oid;private String uid;private double money;private double longitude;private double latitude;private String province;private String city;private String district;@Overridepublic String toString() {return "OrderBean{" +"oid='" + oid + '\'' +", uid='" + uid + '\'' +", money=" + money +", longitude=" + longitude +", latitude=" + latitude +", province='" + province + '\'' +", city='" + city + '\'' +", district='" + district + '\'' +'}';}
}

3、测试用例

{"oid":"o001","uid":"u001","money":99.99,"longitude":115.690417, "latitude":36.239344}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com