io.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. // Copyright 2017 fatedier, fatedier@gmail.com
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package io
  15. import (
  16. "io"
  17. "sync"
  18. "github.com/golang/snappy"
  19. "github.com/fatedier/frp/utils/crypto"
  20. "github.com/fatedier/frp/utils/pool"
  21. )
  22. // Join two io.ReadWriteCloser and do some operations.
  23. func Join(c1 io.ReadWriteCloser, c2 io.ReadWriteCloser) (inCount int64, outCount int64) {
  24. var wait sync.WaitGroup
  25. pipe := func(to io.ReadWriteCloser, from io.ReadWriteCloser, count *int64) {
  26. defer to.Close()
  27. defer from.Close()
  28. defer wait.Done()
  29. buf := pool.GetBuf(16 * 1024)
  30. defer pool.PutBuf(buf)
  31. *count, _ = io.CopyBuffer(to, from, buf)
  32. }
  33. wait.Add(2)
  34. go pipe(c1, c2, &inCount)
  35. go pipe(c2, c1, &outCount)
  36. wait.Wait()
  37. return
  38. }
  39. func WithEncryption(rwc io.ReadWriteCloser, key []byte) (io.ReadWriteCloser, error) {
  40. w, err := crypto.NewWriter(rwc, key)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return WrapReadWriteCloser(crypto.NewReader(rwc, key), w, func() error {
  45. return rwc.Close()
  46. }), nil
  47. }
  48. func WithCompression(rwc io.ReadWriteCloser) io.ReadWriteCloser {
  49. return WrapReadWriteCloser(snappy.NewReader(rwc), snappy.NewWriter(rwc), func() error {
  50. return rwc.Close()
  51. })
  52. }
  53. type ReadWriteCloser struct {
  54. r io.Reader
  55. w io.Writer
  56. closeFn func() error
  57. }
  58. func WrapReadWriteCloser(r io.Reader, w io.Writer, closeFn func() error) io.ReadWriteCloser {
  59. return &ReadWriteCloser{
  60. r: r,
  61. w: w,
  62. closeFn: closeFn,
  63. }
  64. }
  65. func (rwc *ReadWriteCloser) Read(p []byte) (n int, err error) {
  66. return rwc.r.Read(p)
  67. }
  68. func (rwc *ReadWriteCloser) Write(p []byte) (n int, err error) {
  69. return rwc.w.Write(p)
  70. }
  71. func (rwc *ReadWriteCloser) Close() (errRet error) {
  72. var err error
  73. if rc, ok := rwc.r.(io.Closer); ok {
  74. err = rc.Close()
  75. if err != nil {
  76. errRet = err
  77. }
  78. }
  79. if wc, ok := rwc.w.(io.Closer); ok {
  80. err = wc.Close()
  81. if err != nil {
  82. errRet = err
  83. }
  84. }
  85. if rwc.closeFn != nil {
  86. err = rwc.closeFn()
  87. if err != nil {
  88. errRet = err
  89. }
  90. }
  91. return
  92. }