什么是Socket?

  • 即套接字,是一个对 TCP / IP协议进行封装 的编程调用接口(API)

Socket通信模型

Socket 通信步骤

大致的通信步骤可以根据上图的通信模型来编写代码,大致如下步骤:

  1. 创建 ServerSocket 和 Socket
  2. 打开连接到的 Socket 的输入 / 输出流
  3. 按照协议对 Socket 进行读 / 写操作
  4. 关闭输入输出流,以及 Socket

ServiceSocket 和 Socket 通信

接下来,我们先以一个最简单的通信例子来看看 socket 是如何进行通信的,后面我们再来看看长连接是如何实现的,首先先创建 serviceSocket 服务端

  • Socket 服务端
    1. 创建 ServerSocket 对象,绑定监听的端口
    2. 调用 accept() 方法监听客户端的请求(注:该方法如果客户端一直未连接,会被阻塞)
    3. 连接建立后,通过输入流读取客户端发送的请求信息
    4. 通过输出流向客户端发送响应信息
    5. 关闭相关资源

服务端实现代码如下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
fun main() {
    // 指定端口号
    val serviceSocket = ServerSocket(12377)

    println("开始连接...")
    val socket = serviceSocket.accept() // 会发生阻塞
    println("已连接--${socket.localAddress.hostAddress}:${socket.localPort}")

    // 获取客户端发送过来的消息
    val inputStream = socket.getInputStream()

    val inn = InputStreamReader(inputStream)
    val bufferedReader = BufferedReader(inn)
    var data = ""
    while (bufferedReader.readLine()?.also { data = it } != null) {
        println("客户端发送过来的信息: $data")
    }

    println("关闭连接")
    socket.shutdownInput()
    socket.close()
}
  • Socket客户端的编写

    1. 创建 Socket 对象,指明需要链接的服务器的地址和端号
    2. 链接建立后,通过输出流向服务器发送请求信息
    3. 通过输出流获取服务器响应的信息
    4. 关闭相关资源

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      
      fun main() {
      // 连接服务端
      val clientSocket = Socket("127.0.0.1", 12377)
      
      // 获取输出流, 和服务端进行通信
      val outputStream = clientSocket.getOutputStream()
      // 将输出流包装成打印流
      val printWriter = PrintWriter(outputStream)
      printWriter.write("hello world")
      printWriter.flush()
      clientSocket.shutdownOutput() //关闭输出流
      clientSocket.close()
      }

好了, 服务端和客户端通信的 socket 最基础的通信就是这些了。所有的源码会在文章最后放出,建议可以单独运行一下,接下来看下长连接是如何实现的

socket 长连接

实现长连接之前,我们要先了解一些几个概念

TCP 长连接

  1. TCP 连接建立后只要不关闭,逻辑上连接一直存在
  2. TCP 是有保活定时器的,可以打开保活定时器来维持长连接,设置 SO_KEEPALIVE 才会开启,时间间隔默认 7200s ,也就是 2h ,这个默认是关闭的
  3. HTTP 中的 keep-alive 和 TCP 中的 keep-alive 的原理不一样
    • HTTP 中的 keep-alive 主要是:比如浏览器中,一个网站中可能会对当前域名请求多次不同的页面,那么这里的 keep-alive 就是保持这个 TCP 连接在第一次访问该域名就建立了这个 TCP 连接,后续再访问不同的页面就不需要再经过 3 次握手了。因为默认情况下不加这个报文(Connection:keep-alive)一次 TCP 连接建立,请求网络后就结束了,这个 TCP 连接也就结束了。
    • TCP 中的 keep-alive 是用来探测连接的对端是否存活。他的时长是 2h,但是由于 NAT 超时和网络状态(切换网络,断网)可能会发生变化,所以该属性无法报活

NAT 超时

因为 IPv4 的 IP 量有限,运营商分配给手机终端的 IP 是运营商内网的 IP,手机要连接 Internet,就需要通过运营商的网关做一个网络地址转换(Network Address Translation,NAT)。简单的说运营商的网关需要维护一个外网 IP、端口到内网 IP、端口的对应关系,以确保内网的手机可以跟 Internet 的服务器通讯。

大部分移动无线网络运营商都在链路一段时间没有数据通讯时,会淘汰 NAT 表中的对应项,造成链路中断。

长连接心跳间隔必须要小于NAT超时时间(aging-time),如果超过 aging-time 不做心跳,TCP 长连接链路就会中断,Server 就无法发送Push 给手机,只能等到客户端下次心跳失败后,重建连接才能取到消息。

心跳包

  • 心跳的原因:虽然理论tcp连接后一直不断,但实际上会断网,比如 NAT 超时,断网重开等等
  • 心跳包的主要作用是告知对方连接端,我还活着,心还在跳,保持活跃状态
  • 心跳时长多少? 现实是残酷的, 根据网上的一些说法, 中移动 2/3G 下, NAT 超时时间为 5 分钟, 中国电信 3G 则大于 28 分钟, 理想的情况下, 客户端应当以略小于NAT超时时间的间隔来发送心跳包。
地区/网络 NAT超时时间
中国移动3G和2G 5分钟
中国联通2G 5分钟
中国电信3G 大于28分钟
美国3G 大于28分钟
台湾3G 大于28分钟

WiFi 下,NAT 超时时间都会比较长,据说宽带的网关一般没有空闲释放机制, GCM 有些时候在 WiFi 下的心跳比在移动网络下的心跳要快,可能是因为 WiFi 下联网通信耗费的电量比移动网络下小。

这里再做下简述,默认情况下,TCP 建立连接之后,只要不关闭,这个连接就会一直存在,但是假设一种场景,比如用户手机在前一个小时使用了网络进行通讯,然后后面便一直未再使用过了,那么这个 TCP 连接将会一直存在,其实是比较耗费资源的,那么运营商弄了一个 NAT 超时,如上表超时时间所示,只要超过了这个时间,并未发生任何数据流量的通信,那么将会主动断开这个连接,所以,我们建立长连接,小于 NAT 超时时间的时候发送心跳包,其实长链接的核心本质就是防止 NAT 超时。

可能存在的风险及预防措施

  • 网络状态发生变化(移动网络、WiFi 网络切换、断开、重连)
  • 解决方案: 断线重连机制

不可抗拒因素

  • DHCP (Dynamic Host Configuration Protocol

简单的来说,DHCP 是一个局域网协议,使用 UDP 协议进行工作,它的作用就是动态的分配 IP 地址,Gateway 地址,DNS 服务器地址等信息,一旦租约到期,那么路由器就会把当前的这个 ip 分配给其他设备使用,所以,对于设备而言要定期请求 DHCP Server 来更新 ip 地址信息,保证 ip 地址有效可用。一般常见的情况下,window 连接网络的时候就是动态分配 ip 的

会遇到的问题:如果遇到 DHCP 过期,那么设备仍然会使用旧的 ip,对于 Tcp 连接来说,使用旧的过期 ip 就意味着连接不到远程服务器,当 TCP 使用过期的 ip 去连接远程服务器的时候会报异常:java.net.NoRouteToHostException: No route to host,意思是说没有可达 Host 的路由。设备连接无线网是连接到路由器上的,而路由器上分配给设备的 ip 已经过期不可用,那么设备到路由器的链路是通的,但是路由器到远程主机的链路肯定是不通的,所以会报如上错误

长连接源码实现

socket 长连接可能需要注意的地方就是这些了,接下来看下源码如何实现长连接,注意我在这里是开启了多进程

服务端 ServerSocket 实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
fun main() {
    // 指定端口号
    val serviceSocket = ServerSocket(12377)

    // 会发生阻塞
    println("开始连接...")
    val socket = serviceSocket.accept()
    println("已连接--${socket.localAddress.hostAddress}:${socket.localPort}")

    // 获取客户端发送过来的消息
    val inputStream = socket.getInputStream()
    val inn = InputStreamReader(inputStream)
    val bufferedReader = BufferedReader(inn)
    var data = ""

    // 回复客户端
    val outputStream = socket.getOutputStream()

    while (bufferedReader.readLine()?.also { data = it } != null) {
        println("客户端发送过来的信息: $data")

        when (data) {
            "HeartBeat" -> {
                // 检测到心跳包
                outputStream.write("ok".toByteArray())
                outputStream.flush()
            }

            else -> {
                // 回复给客户端
                val df: DateFormat = SimpleDateFormat("HH:mm:ss")
                println("${df.format(Date())}: 收到消息")
                outputStream.write("${df.format(Date())}: $data".toByteArray())
                outputStream.flush()
            }
        }
    }

    println("关闭连接")
    socket.shutdownInput()
    socket.close()
}

客户端 socket 通信连接

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/**
 * desc: 维护长连接的服务
 *
 * 1. 该 service 需要开启多进程, 需要考虑进程报活
 * 2. 需要考虑 NAT 超时情况, 需要维护长连接
 * 3. 可以理解该服务就是连接远程服务端 socket 并和远程服务端进行消息通信
 *
 * @author midFang
 *
 */
class RemoteSocketService : Service() {

  	// 多进程下使用的 List,防止移除失败
    private val remoteCallbackList by lazy { RemoteCallbackList<IMessageCallback>() }

    private var sendTime = 0L
    private var mSocket: WeakReference<Socket>? = null
    private var mReaderThread: ReaderThread? = null


    private val mHandler = Handler(Looper.getMainLooper())

    /**心跳任务,不断重复检测心跳 **/
    private val heartBeatRunnable = object : Runnable {
        override fun run() {
            if (System.currentTimeMillis() - sendTime >= HEART_BEAT_RATE) {
                val isSuccess = sendMsg("HeartBeat") // 发送心跳
                if (!isSuccess) {
                    // 如果发送失败,就重新建立 socket
                    mHandler.removeCallbacks(this)
                    mReaderThread?.release()
                    releaseLastSocket(mSocket)
                    InitSocketThread().start()
                }
            }
            mHandler.postDelayed(this, HEART_BEAT_RATE)
        }
    }

    companion object {
        /**
         * 可设置比 NAT 超时的时间少, 一般各大运营商的时间都各不相同
         * 长链接的核心本质就是避免 NAT 超时时间
         * 因为即使 TCP 的时间并没有中断, 但是还是会因为通信双方并发发生任何数据交互,从而运行商会主动关闭连接,这个就是 NAT 超时
         */
        private const val HEART_BEAT_RATE = 3 * 1000.toLong()

        // 本机电脑的 ip
        private var HOST: String = "192.168.1.172"
        private var PORT: Int = 12377

        /**
         * 连接服务
         */
        fun connect(context: Context, serviceConnection: ServiceConnection) {
            Intent(context.applicationContext, RemoteSocketService::class.java).also {
                context.applicationContext.bindService(it,serviceConnection,Context.BIND_AUTO_CREATE)
            }
        }
    }


    /**
     * 客户端调用服务端的方法.
     */
    private val mRemoteService = object : ISocket.Stub() {
        /**
         * @throws RemoteSocketService
         */
        override fun sendMessage(message: String?) = sendMsg(message)
        override fun addMessageCallback(back: IMessageCallback?) {
            remoteCallbackList.register(back)
        }

        override fun removeMessageCallback(back: IMessageCallback?) {
            remoteCallbackList.unregister(back)
        }
    }

    override fun onBind(intent: Intent?): IBinder? = mRemoteService

    /**
     * 大致的步骤有:
     * 1. 连接 socket 服务
     * 2. 解析数据
     * 3. 返回给外部数据
     */
    override fun onCreate() {
        super.onCreate()
        // 开始连接 socket 服务
        InitSocketThread().start()
    }

    inner class InitSocketThread : Thread() {
        override fun run() {
            super.run()
            initSocket()
        }
    }

    override fun onDestroy() {
        super.onDestroy()
        mHandler.removeCallbacks(heartBeatRunnable)
        mReaderThread?.release()
        releaseLastSocket(mSocket)
    }

    private fun initSocket() {
        println("端口$HOST:$PORT")
        val socket = Socket(HOST, PORT)
        // 2 小时内探测对方, 并无法保证 NAT 超时,也无法保活
//        socket.keepAlive = true

        mSocket = WeakReference(socket)
        // 开启读写线程
        mReaderThread = ReaderThread(socket)
        mReaderThread?.start()

        mHandler.postDelayed(heartBeatRunnable, HEART_BEAT_RATE)
    }

    inner class ReaderThread(socket: Socket) : Thread() {

        private var isStart = true
        private val mWeakSocket by lazy { WeakReference<Socket>(socket) }

        fun release() {
            isStart = false
            releaseLastSocket(mWeakSocket)
        }

        override fun run() {
            super.run()

            kotlin.runCatching {
                mWeakSocket.get()?.let { socket ->
                    val inputStream = socket.getInputStream()
                    val byteArray = ByteArray(1024 * 4)
                    var length = 0

                    while (!socket.isClosed && !socket.isInputShutdown && isStart
                        && inputStream.read(byteArray).also { length = it } != -1
                    ) {
                        if (length > 0) {
                            // 解析数据
                            val message = String(byteArray.copyOf(length))
                            println("接收到的消息 $message")

                            // 回复数据
                            val size = remoteCallbackList.beginBroadcast()
                            repeat(size) {
                                val item = remoteCallbackList.getBroadcastItem(it)
                                item.receiverMessage(message)
                            }
                            remoteCallbackList.finishBroadcast()
                        }
                    }
                }
            }

        }
    }

    private fun releaseLastSocket(weakSocket: WeakReference<Socket>?) {
        kotlin.runCatching {
            var get = weakSocket?.get()
            if (get?.isClosed == true) {
                get?.close()
            }
            get = null
        }
    }

    /**
     * 发送消息
     */
    private fun sendMsg(message: String?): Boolean {
        if (mSocket == null || mSocket?.get() == null) return false

        mSocket?.get()?.let { socket ->

            if (!socket.isClosed && !socket.isOutputShutdown) {
                thread {
                    val outputStream = socket.getOutputStream()
                    val message: String = message + "\r\n"
                    kotlin.runCatching {
                        outputStream.write(message.toByteArray())
                        outputStream.flush()
                    }
                }

                sendTime = System.currentTimeMillis() // 每次发送成数据,就改一下最后成功发送的时间,节省心跳间隔时间

            } else return false
        }

        return true
    }
}

主要流程的源码实现就是这些了,具体细节部分可以看下工程项目。

源码地址: https://github.com/midFang/blogSource/tree/main/SocketDemo

引用链接

https://blog.csdn.net/yzpbright/article/details/113721914