public class gRPCWatcher {
/**
* grpc channel
*/
private final gRPCChannel gRPCChannel;
/**
* 当前重连的次数
*/
private int hasReconnectTimes = 0;
/**
* 最大重连次数
*/
private final int maxReconnectTimes = 3;
/**
* 构造函数
*/
public gRPCWatcher(gRPCChannel gRPCChannel) {
this.gRPCChannel = gRPCChannel;
}
public void startWatch(Request confRequest, WatchHandler watchHandler) {
StreamObserver<ConfResponse> streamObserver = new StreamObserver<ConfResponse>() {
@Override
public void onNext(ConfResponse confResponse) {
try {
watchHandler.process(confResponse);
} catch (Exception e) {
//...
}
hasReconnectTimes = 0;
}
@Override
public void onError(Throwable throwable) {
//error 的时候尝试重新调用,超过最大次数失败抛出异常
if (hasReconnectTimes < maxReconnectTimes) {
hasReconnectTimes++;
startWatch(confRequest, watchHandler);
} else {
throw new RuntimeException("reWatch "+maxReconnectTimes+" times still error" + throwable.getMessage());
}
}
@Override
public void onCompleted() {
//...
}
};
ConfigGrpc.newStub(gRPCChannel.getNettyChannel()).withWaitForReady().watch(confRequest, streamObserver);
}
}
这是一段处理服务端流的 grpc 调用的逻辑,当网络出问题等异常情况的时候,会进入 onError, grpc channel 底层自带重连重试机制,所以我们只要重新 stub call 就行了。
所以我在 onError 的时候重新调用该方法,为了防止无限调,所以加了以这个最大重试次数,不知道这样写优雅不优雅- -
关于 grpc 的重连 可以看官方的讨论,他们也是推荐 channel 可以复用 主要 stub 重新 call https://groups.google.com/g/grpc-io/c/quToVM4NhdQ
1
0576coder OP you 老哥指点一下么- -
|
2
yidongnan 2021-08-20 15:09:38 +08:00
Java 用户使用 grpc,推荐下 https://github.com/yidongnan/grpc-spring-boot-starter
|