controller.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. // Copyright 2025 The frp Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package vnet
  15. import (
  16. "context"
  17. "encoding/base64"
  18. "fmt"
  19. "io"
  20. "net"
  21. "sync"
  22. "github.com/fatedier/golib/pool"
  23. "github.com/songgao/water/waterutil"
  24. "golang.org/x/net/ipv4"
  25. "golang.org/x/net/ipv6"
  26. v1 "github.com/fatedier/frp/pkg/config/v1"
  27. "github.com/fatedier/frp/pkg/util/log"
  28. "github.com/fatedier/frp/pkg/util/xlog"
  29. )
  30. const (
  31. maxPacketSize = 1420
  32. )
  33. type Controller struct {
  34. addr string
  35. tun io.ReadWriteCloser
  36. clientRouter *clientRouter // Route based on destination IP (client mode)
  37. serverRouter *serverRouter // Route based on source IP (server mode)
  38. }
  39. func NewController(cfg v1.VirtualNetConfig) *Controller {
  40. return &Controller{
  41. addr: cfg.Address,
  42. clientRouter: newClientRouter(),
  43. serverRouter: newServerRouter(),
  44. }
  45. }
  46. func (c *Controller) Init() error {
  47. tunDevice, err := OpenTun(context.Background(), c.addr)
  48. if err != nil {
  49. return err
  50. }
  51. c.tun = tunDevice
  52. return nil
  53. }
  54. func (c *Controller) Run() error {
  55. conn := c.tun
  56. for {
  57. buf := pool.GetBuf(maxPacketSize)
  58. n, err := conn.Read(buf)
  59. if err != nil {
  60. pool.PutBuf(buf)
  61. log.Warnf("vnet read from tun error: %v", err)
  62. return err
  63. }
  64. c.handlePacket(buf[:n])
  65. pool.PutBuf(buf)
  66. }
  67. }
  68. // handlePacket processes a single packet. The caller is responsible for managing the buffer.
  69. func (c *Controller) handlePacket(buf []byte) {
  70. log.Tracef("vnet read from tun [%d]: %s", len(buf), base64.StdEncoding.EncodeToString(buf))
  71. var src, dst net.IP
  72. switch {
  73. case waterutil.IsIPv4(buf):
  74. header, err := ipv4.ParseHeader(buf)
  75. if err != nil {
  76. log.Warnf("parse ipv4 header error: %v", err)
  77. return
  78. }
  79. src = header.Src
  80. dst = header.Dst
  81. log.Tracef("%s >> %s %d/%-4d %-4x %d",
  82. header.Src, header.Dst,
  83. header.Len, header.TotalLen, header.ID, header.Flags)
  84. case waterutil.IsIPv6(buf):
  85. header, err := ipv6.ParseHeader(buf)
  86. if err != nil {
  87. log.Warnf("parse ipv6 header error: %v", err)
  88. return
  89. }
  90. src = header.Src
  91. dst = header.Dst
  92. log.Tracef("%s >> %s %d %d",
  93. header.Src, header.Dst,
  94. header.PayloadLen, header.TrafficClass)
  95. default:
  96. log.Tracef("unknown packet, discarded(%d)", len(buf))
  97. return
  98. }
  99. targetConn, err := c.clientRouter.findConn(dst)
  100. if err == nil {
  101. if err := WriteMessage(targetConn, buf); err != nil {
  102. log.Warnf("write to client target conn error: %v", err)
  103. }
  104. return
  105. }
  106. targetConn, err = c.serverRouter.findConnBySrc(dst)
  107. if err == nil {
  108. if err := WriteMessage(targetConn, buf); err != nil {
  109. log.Warnf("write to server target conn error: %v", err)
  110. }
  111. return
  112. }
  113. log.Tracef("no route found for packet from %s to %s", src, dst)
  114. }
  115. func (c *Controller) Stop() error {
  116. return c.tun.Close()
  117. }
  118. // Client connection read loop
  119. func (c *Controller) readLoopClient(ctx context.Context, conn io.ReadWriteCloser) {
  120. xl := xlog.FromContextSafe(ctx)
  121. defer func() {
  122. // Remove the route when read loop ends (connection closed)
  123. c.clientRouter.removeConnRoute(conn)
  124. conn.Close()
  125. }()
  126. for {
  127. data, err := ReadMessage(conn)
  128. if err != nil {
  129. xl.Warnf("client read error: %v", err)
  130. return
  131. }
  132. if len(data) == 0 {
  133. continue
  134. }
  135. switch {
  136. case waterutil.IsIPv4(data):
  137. header, err := ipv4.ParseHeader(data)
  138. if err != nil {
  139. xl.Warnf("parse ipv4 header error: %v", err)
  140. continue
  141. }
  142. xl.Tracef("%s >> %s %d/%-4d %-4x %d",
  143. header.Src, header.Dst,
  144. header.Len, header.TotalLen, header.ID, header.Flags)
  145. case waterutil.IsIPv6(data):
  146. header, err := ipv6.ParseHeader(data)
  147. if err != nil {
  148. xl.Warnf("parse ipv6 header error: %v", err)
  149. continue
  150. }
  151. xl.Tracef("%s >> %s %d %d",
  152. header.Src, header.Dst,
  153. header.PayloadLen, header.TrafficClass)
  154. default:
  155. xl.Tracef("unknown packet, discarded(%d)", len(data))
  156. continue
  157. }
  158. xl.Tracef("vnet write to tun (client) [%d]: %s", len(data), base64.StdEncoding.EncodeToString(data))
  159. _, err = c.tun.Write(data)
  160. if err != nil {
  161. xl.Warnf("client write tun error: %v", err)
  162. }
  163. }
  164. }
  165. // Server connection read loop
  166. func (c *Controller) readLoopServer(ctx context.Context, conn io.ReadWriteCloser, onClose func()) {
  167. xl := xlog.FromContextSafe(ctx)
  168. defer func() {
  169. // Clean up all IP mappings associated with this connection when it closes
  170. c.serverRouter.cleanupConnIPs(conn)
  171. // Call the provided callback upon closure
  172. if onClose != nil {
  173. onClose()
  174. }
  175. conn.Close()
  176. }()
  177. for {
  178. data, err := ReadMessage(conn)
  179. if err != nil {
  180. xl.Warnf("server read error: %v", err)
  181. return
  182. }
  183. if len(data) == 0 {
  184. continue
  185. }
  186. // Register source IP to connection mapping
  187. if waterutil.IsIPv4(data) || waterutil.IsIPv6(data) {
  188. var src net.IP
  189. if waterutil.IsIPv4(data) {
  190. header, err := ipv4.ParseHeader(data)
  191. if err == nil {
  192. src = header.Src
  193. c.serverRouter.registerSrcIP(src, conn)
  194. }
  195. } else {
  196. header, err := ipv6.ParseHeader(data)
  197. if err == nil {
  198. src = header.Src
  199. c.serverRouter.registerSrcIP(src, conn)
  200. }
  201. }
  202. }
  203. xl.Tracef("vnet write to tun (server) [%d]: %s", len(data), base64.StdEncoding.EncodeToString(data))
  204. _, err = c.tun.Write(data)
  205. if err != nil {
  206. xl.Warnf("server write tun error: %v", err)
  207. }
  208. }
  209. }
  210. // RegisterClientRoute registers a client route (based on destination IP CIDR)
  211. // and starts the read loop
  212. func (c *Controller) RegisterClientRoute(ctx context.Context, name string, routes []net.IPNet, conn io.ReadWriteCloser) {
  213. c.clientRouter.addRoute(name, routes, conn)
  214. go c.readLoopClient(ctx, conn)
  215. }
  216. // UnregisterClientRoute Remove client route from routing table
  217. func (c *Controller) UnregisterClientRoute(name string) {
  218. c.clientRouter.delRoute(name)
  219. }
  220. // StartServerConnReadLoop starts the read loop for a server connection
  221. // (dynamically associates with source IPs)
  222. func (c *Controller) StartServerConnReadLoop(ctx context.Context, conn io.ReadWriteCloser, onClose func()) {
  223. go c.readLoopServer(ctx, conn, onClose)
  224. }
  225. // ParseRoutes Convert route strings to IPNet objects
  226. func ParseRoutes(routeStrings []string) ([]net.IPNet, error) {
  227. routes := make([]net.IPNet, 0, len(routeStrings))
  228. for _, r := range routeStrings {
  229. _, ipNet, err := net.ParseCIDR(r)
  230. if err != nil {
  231. return nil, fmt.Errorf("parse route %s error: %v", r, err)
  232. }
  233. routes = append(routes, *ipNet)
  234. }
  235. return routes, nil
  236. }
  237. // Client router (based on destination IP routing)
  238. type clientRouter struct {
  239. routes map[string]*routeElement
  240. mu sync.RWMutex
  241. }
  242. func newClientRouter() *clientRouter {
  243. return &clientRouter{
  244. routes: make(map[string]*routeElement),
  245. }
  246. }
  247. func (r *clientRouter) addRoute(name string, routes []net.IPNet, conn io.ReadWriteCloser) {
  248. r.mu.Lock()
  249. defer r.mu.Unlock()
  250. r.routes[name] = &routeElement{
  251. name: name,
  252. routes: routes,
  253. conn: conn,
  254. }
  255. }
  256. func (r *clientRouter) findConn(dst net.IP) (io.Writer, error) {
  257. r.mu.RLock()
  258. defer r.mu.RUnlock()
  259. for _, re := range r.routes {
  260. for _, route := range re.routes {
  261. if route.Contains(dst) {
  262. return re.conn, nil
  263. }
  264. }
  265. }
  266. return nil, fmt.Errorf("no route found for destination %s", dst)
  267. }
  268. func (r *clientRouter) delRoute(name string) {
  269. r.mu.Lock()
  270. defer r.mu.Unlock()
  271. delete(r.routes, name)
  272. }
  273. func (r *clientRouter) removeConnRoute(conn io.Writer) {
  274. r.mu.Lock()
  275. defer r.mu.Unlock()
  276. for name, re := range r.routes {
  277. if re.conn == conn {
  278. delete(r.routes, name)
  279. return
  280. }
  281. }
  282. }
  283. // Server router (based solely on source IP routing)
  284. type serverRouter struct {
  285. srcIPConns map[string]io.Writer // Source IP string to connection mapping
  286. mu sync.RWMutex
  287. }
  288. func newServerRouter() *serverRouter {
  289. return &serverRouter{
  290. srcIPConns: make(map[string]io.Writer),
  291. }
  292. }
  293. func (r *serverRouter) findConnBySrc(src net.IP) (io.Writer, error) {
  294. r.mu.RLock()
  295. defer r.mu.RUnlock()
  296. conn, exists := r.srcIPConns[src.String()]
  297. if !exists {
  298. return nil, fmt.Errorf("no route found for source %s", src)
  299. }
  300. return conn, nil
  301. }
  302. func (r *serverRouter) registerSrcIP(src net.IP, conn io.Writer) {
  303. key := src.String()
  304. r.mu.RLock()
  305. existingConn, ok := r.srcIPConns[key]
  306. r.mu.RUnlock()
  307. // If the entry exists and the connection is the same, no need to do anything.
  308. if ok && existingConn == conn {
  309. return
  310. }
  311. // Acquire write lock to update the map.
  312. r.mu.Lock()
  313. defer r.mu.Unlock()
  314. // Double-check after acquiring the write lock to handle potential race conditions.
  315. existingConn, ok = r.srcIPConns[key]
  316. if ok && existingConn == conn {
  317. return
  318. }
  319. r.srcIPConns[key] = conn
  320. }
  321. // cleanupConnIPs removes all IP mappings associated with the specified connection
  322. func (r *serverRouter) cleanupConnIPs(conn io.Writer) {
  323. r.mu.Lock()
  324. defer r.mu.Unlock()
  325. // Find and delete all IP mappings pointing to this connection
  326. for ip, mappedConn := range r.srcIPConns {
  327. if mappedConn == conn {
  328. delete(r.srcIPConns, ip)
  329. }
  330. }
  331. }
  332. type routeElement struct {
  333. name string
  334. routes []net.IPNet
  335. conn io.ReadWriteCloser
  336. }