PyProc 的 frame 结构分成了两个部分, 第一个部分是对应的帧头, 第二部是对应的 数据部分, 也就是对应的数据部分,类型下面这样

[header][payload]

这个数据结构在 Go 语言中表示为:

type Frame struct {
	Header  FrameHeader
	Payload []byte
}

整个数据帧格式分成了下面三个字段: 第一个字段是魔数, 用来验证对应的帧, 第二个字段是总共的帧长度,第三个是请求的ID, 同时CRC32C 表示对应的校验和

const (
    FrameHeaderSize = 18  // header总大小
    MagicByte1 = 0x50     // 魔数字节1 ('P')
    MagicByte2 = 0x59     // 魔数字节2 ('Y')
)

type FrameHeader struct {
    Magic [2]byte
    Length uint32
    RequestID uint32
    CRC32C    uint32
}

在 pyproc 里面还有另外一个结构,叫做 Framer 专门用来处理对应的流操作,它的定义如下:

type Framer struct {
    rw           io.ReadWriter  // 底层流
    maxFrameSize int            // 安全限制
    enhancedMode bool           // 协议模式
}

那这里为什么使用 io.ReadWriter 这个类型来表示对应的流, 如果比较熟悉 Go 语言的话,在 net 包中有一个核心的抽象叫做 net.Conn, 它是接口类型 这个接口类型其实是 io.ReadWriter 的 子接口实现。

在 pyproc 中有一个核心的数据结构是 Pool, 这个 Pool 主要用来管理我们的 Go 语言程序和 Python 之间的通信,

type Pool struct {
  opts     PoolOptions     // 配置选项
  logger   *Logger         // 日志记录器
  workers  []*poolWorker   // workers 池子
  nextIdx  atomic.Uint64   // 用于轮询调度的索引
  shutdown atomic.Bool     // 关闭状态标志
  wg       sync.WaitGroup  // 等待组,用于优雅关闭
  semaphore chan struct{}  // 信号量,用于背压控制
  healthMu sync.RWMutex    // 健康状态锁
  healthStatus HealthStatus // 健康状态
  healthCancel context.CancelFunc // 健康监控取消函数
  activeRequests map[uint64]*activeRequest // 活动请求跟踪
  activeRequestsMu sync.RWMutex // 活动请求锁
}

在这个数据结构中, 还存在另外一个数据结构叫做 poolWorker, 就是这个 pool 中放的工作线程:

type poolWorker struct {
	worker    *Worker
	connPool  chan net.Conn
	requestID atomic.Uint64
	healthy   atomic.Bool
}

在 Pool 的初始化阶段, 会根据配置的 work 数量来去开启对应的 work, 同时我们也会初始化对应的 work 的状态为 healthy 状态, 如果某个 work 启动失败,我们将会去回滚已经启动的线程

if err := pw.worker.Start(ctx); err != nil {
    // 错误处理:停止已启动的线程
    for j := 0; j < i; j++ {
           _ = p.workers[j].worker.Stop()
    }
    return fmt.Errorf("failed to start worker %d: %w", i, err)
}

Pool 有一个 call 方法会选择一个 work 来执行, 这种选择方式采用的是轮询的方式

// input 是我们的请求参数
Call(ctx context.Context, method string, input any, output any) error

当轮询到对应的 work 的时候, 从这个 work 中的 connPool 中读取一个连接, 如果 work 中的 pool 没有对应的连接, 那就创建一个连接,之后就是发送对应的消息,在发送消息之前会给这个请求加上一个 requestID 然后会调用framer 相关的方法去发送对应的消息。