You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

424 lines
8.0 KiB

package hue2
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"git.aiterp.net/lucifer/new-server/internal/lerrors"
"io"
"log"
"net"
"net/http"
"strings"
"time"
)
func NewClient(host, token string) *Client {
ch := make(chan struct{}, 5)
for i := 0; i < 2; i++ {
ch <- struct{}{}
}
return &Client{
host: host,
token: token,
ch: ch,
}
}
type Client struct {
host string
token string
ch chan struct{}
}
func (c *Client) Register(ctx context.Context) (string, error) {
result := make([]CreateUserResponse, 0, 1)
err := c.post(ctx, "api/", CreateUserInput{DeviceType: "git.aiterp.net/lucifer"}, &result)
if err != nil {
return "", err
}
if len(result) == 0 || result[0].Error != nil {
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(time.Second):
return c.Register(ctx)
}
}
if result[0].Success == nil {
return "", lerrors.ErrUnexpectedResponse
}
c.token = result[0].Success.Username
return result[0].Success.Username, nil
}
func (c *Client) AllResources(ctx context.Context) ([]ResourceData, error) {
res := struct {
Error interface{}
Data []ResourceData
}{}
err := c.get(ctx, "clip/v2/resource", &res)
if err != nil {
return nil, err
}
return res.Data, nil
}
func (c *Client) Resources(ctx context.Context, kind string) ([]ResourceData, error) {
res := struct {
Error interface{}
Data []ResourceData
}{}
err := c.get(ctx, "clip/v2/resource/"+kind, &res)
if err != nil {
return nil, err
}
return res.Data, nil
}
func (c *Client) UpdateResource(ctx context.Context, link ResourceLink, update ResourceUpdate) error {
return c.put(ctx, link.Path(), update, nil)
}
func (c *Client) DeleteResource(ctx context.Context, link ResourceLink) error {
return c.delete(ctx, link.Path(), nil)
}
func (c *Client) LegacyDiscover(ctx context.Context, kind string) error {
return c.legacyPost(ctx, kind, nil, nil)
}
func (c *Client) SSE(ctx context.Context) <-chan SSEUpdate {
ch := make(chan SSEUpdate, 4)
go func() {
defer close(ch)
reader, err := c.getReader(ctx, "/eventstream/clip/v2", map[string]string{
"Accept": "text/event-stream",
})
if err != nil {
log.Println("SSE Connect error:", err)
return
}
defer reader.Close()
br := bufio.NewReader(reader)
for {
line, err := br.ReadString('\n')
if err != nil {
log.Println("SSE Read error:", err)
return
}
line = strings.Trim(line, "  \t\r\n")
kv := strings.SplitN(line, ": ", 2)
if len(kv) < 2 {
continue
}
switch kv[0] {
case "data":
var data []SSEUpdate
err := json.Unmarshal([]byte(kv[1]), &data)
if err != nil {
log.Println("Parsing SSE event failed:", err)
log.Println(" json:", kv[1])
return
}
for _, obj := range data {
ch <- obj
}
}
}
}()
return ch
}
func (c *Client) getReader(ctx context.Context, path string, headers map[string]string) (io.ReadCloser, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
req, err := http.NewRequest("GET", fmt.Sprintf("https://%s/%s", c.host, path), nil)
if err != nil {
return nil, err
}
for key, value := range headers {
req.Header.Set(key, value)
}
req.Header.Set("hue-application-key", c.token)
client := httpClient
if headers["Accept"] == "text/event-stream" {
client = sseClient
}
res, err := client.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
_ = res.Body.Close()
return nil, errors.New(res.Status)
}
return res.Body, nil
}
func (c *Client) get(ctx context.Context, path string, target interface{}) error {
body, err := c.getReader(ctx, path, map[string]string{})
if err != nil {
return err
}
defer body.Close()
if target == nil {
return nil
}
return json.NewDecoder(body).Decode(&target)
}
func (c *Client) put(ctx context.Context, path string, body interface{}, target interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
rb, err := reqBody(body)
if err != nil {
return err
}
req, err := http.NewRequest("PUT", fmt.Sprintf("https://%s/%s", c.host, path), rb)
if err != nil {
return err
}
req.Header.Set("hue-application-key", c.token)
res, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if target == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(&target)
}
func (c *Client) post(ctx context.Context, path string, body interface{}, target interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
rb, err := reqBody(body)
if err != nil {
return err
}
req, err := http.NewRequest("POST", fmt.Sprintf("https://%s/%s", c.host, path), rb)
if err != nil {
return err
}
if c.token != "" {
req.Header.Set("hue-application-key", c.token)
}
res, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if target == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(&target)
}
func (c *Client) delete(ctx context.Context, path string, target interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
req, err := http.NewRequest("PUT", fmt.Sprintf("https://%s/%s", c.host, path), nil)
if err != nil {
return err
}
req.Header.Set("hue-application-key", c.token)
res, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if target == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(&target)
}
func (c *Client) legacyPost(ctx context.Context, resource string, body interface{}, target interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
rb, err := reqBody(body)
if err != nil {
return err
}
if c.token != "" {
resource = c.token + "/" + resource
}
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/%s", c.host, resource), rb)
if err != nil {
return err
}
res, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if target == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(target)
}
func (c *Client) legacyDelete(ctx context.Context, resource string, target interface{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-c.ch:
defer func() {
c.ch <- struct{}{}
}()
}
if c.token != "" {
resource = c.token + "/" + resource
}
req, err := http.NewRequest("DELETE", fmt.Sprintf("http://%s/api/%s", c.host, resource), nil)
if err != nil {
return err
}
res, err := httpClient.Do(req.WithContext(ctx))
if err != nil {
return err
}
defer res.Body.Close()
if target == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(target)
}
func reqBody(body interface{}) (io.Reader, error) {
if body == nil {
return nil, nil
}
switch v := body.(type) {
case []byte:
return bytes.NewReader(v), nil
case string:
return strings.NewReader(v), nil
case io.Reader:
return v, nil
default:
jsonData, err := json.Marshal(v)
if err != nil {
return nil, err
}
return bytes.NewReader(jsonData), nil
}
}
var sseClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: time.Hour * 1000000,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 5,
MaxIdleConnsPerHost: 1,
IdleConnTimeout: 0,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Hour * 1000000,
}
var httpClient = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: 256,
MaxIdleConnsPerHost: 16,
IdleConnTimeout: 10 * time.Minute,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: time.Minute,
}