sparrow

🧩 Syntax:
package com.bagdouri.targha.service;

import com.bagdouri.targha.client.exchange.BinanceFuturesClient;
import com.bagdouri.targha.dto.Price;
import com.bagdouri.targha.dto.Symbol;
import com.bagdouri.targha.dto.exchange.*;
import com.bagdouri.targha.math.SimpleMovingSequence;
import com.bagdouri.targha.util.ThreadUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;

@Slf4j
@Service
@Profile("sparrow")
public class SparrowTradingService extends AbstractService {

    private static final ExecutorService executorService = Executors.newSingleThreadExecutor(
            new CustomizableThreadFactory("sparrow-loop-"));

    private final BinanceFuturesClient binanceFuturesClient;
    private final Symbol symbol = Symbol.of("BTC", "USDC");
    private final LongAdder priceChanges = new LongAdder();
    private final double quantity = 0.002;
    private final double spread = 10;
    private final double skewRatio = spread / 20.0;
    private final AtomicLong attemptedOrders = new AtomicLong();
    private final AtomicBoolean stopped = new AtomicBoolean(false);
    private final AlertService alertService;
    private final Collection<OrderResponse> doneOrders = ConcurrentHashMap.newKeySet();
    private final AtomicReference<OrderResponse> lastFilledOrder = new AtomicReference<>();
    private volatile Price lastPrice = null;
    private final AtomicReference<OrderResponse> askOrder = new AtomicReference<>();
    private final AtomicReference<OrderResponse> bidOrder = new AtomicReference<>();
    private final SimpleMovingSequence tradesMovingSequence = new SimpleMovingSequence(20, SimpleMovingSequence.NO_FLAGS);
    private final double tradeAvgLimit = 2.0;

    public SparrowTradingService(
            Environment environment,
            BinanceFuturesClient binanceFuturesClient,
            AlertService alertService) {

        super(environment);
        this.binanceFuturesClient = binanceFuturesClient;
        this.alertService = alertService;
    }

    @Override
    protected void init() throws Exception {
        cancelAllOrders();
        binanceFuturesClient.addAggTradeListener(
                e -> tradesMovingSequence.add(e.isBuyerMaker() ? -1.0 : 1.0));
        ThreadUtils.submitTask("sparrow-loop", symbol.toString(), executorService, this::loop);
    }

    private void cancelAllOrders() {
        binanceFuturesClient.getOpenOrders().stream()
                .filter(order -> order.getSymbol().equals(symbol.getBase() + symbol.getQuote()))
                .forEach(order -> {
                    this.alertService.alert("targha_cycles", "Cancelling order: " + order);
                    log.info("Cancelling order: {}", order);
                    binanceFuturesClient.cancelOrder(symbol.getBase() + symbol.getQuote(), order.getOrderId());
                });
    }

    private void loop() {
        log.info("Starting Sparrow loop");
        try {
            while (!Thread.currentThread().isInterrupted() && !stopped.get()) {
                try {
                    checkOrder("ASK", askOrder, bidOrder);
                    checkOrder("BID", bidOrder, askOrder);

                    if (askOrder.get() == null || bidOrder.get() == null) {
                        Price price = binanceFuturesClient.getPrice(symbol.getBase(), symbol.getQuote());
                        if (price == null) continue;
                        if (!price.equals(lastPrice)) priceChanges.increment();
                        lastPrice = price;

                        double tradeAvg = getTradeAverage();

                        if(tradeAvg < tradeAvgLimit && askOrder.get() == null) {

                            double position = binanceFuturesClient.getPosition(symbol);
                            log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg);
                            sendAskOrder(price, position);
                        }
                        if(tradeAvg > -tradeAvgLimit && bidOrder.get() == null) {

                            double position = binanceFuturesClient.getPosition(symbol);
                            log.info("Price: {}, Position: {}, Trade avg: {}", price, position, tradeAvg);
                            sendBidOrder(price, position);
                        }
                    }
                } catch (Exception e) {
                    log.error("Sparrow iteration failed. Sleeping for 1 second", e);
                    ThreadUtils.sleep(Duration.ofSeconds(1L));
                }
            }
        } finally {
            log.info("Sparrow loop stopped");
        }
    }

    private void sendBidOrder(Price price, double position) {
        sendOrder("BID", price, position, bidOrder, true);
    }

    private void sendAskOrder(Price price, double position) {
        sendOrder("ASK", price, position, askOrder, false);
    }

    private void sendOrder(String tag, Price currentPrice, double position, AtomicReference<OrderResponse> order, boolean bid) {

        if(order.get() != null) {
            throw new IllegalStateException("Order already exists: " + order.get());
        }

        double sign = bid ? 1.0 : -1.0;

        double skew = Math.round(position / quantity) * skewRatio;

        double orderPrice = (currentPrice.mid() - sign * spread) - skew;

        log.info("Skew: {}, Position: {}, Initial order price: {}", skew, position, orderPrice);

        NewOrder newOrder = NewOrder.builder()
                .baseCurrency(symbol.getBase())
                .quoteCurrency(symbol.getQuote())
                .type(OrderType.LIMIT)
                .timeInForce(TimeInForce.GTX)
                .quantity(sign * quantity)
                .price(bid ? Math.min(orderPrice, currentPrice.bid()) : Math.max(orderPrice, currentPrice.ask()))
                .leverage(20.0)
                .marginType(MarginType.CROSSED)
                .build();
        alertService.alert("targha_cycles", "Send new " + tag + " order: " + newOrder);
        log.info("Send new {} order: {}", tag, newOrder);

        var orderResponse = binanceFuturesClient.newOrder(newOrder);
        log.info("New {} order response: {}", tag, orderResponse);
        order.set(orderResponse);
    }

    private boolean checkOrder(String tag, AtomicReference<OrderResponse> order, AtomicReference<OrderResponse> otherOrder) {
        var orderResponse = order.get();

        if(orderResponse == null) {
            return false;
        }

        OrderStatus orderStatus = binanceFuturesClient.getOrderStatus(
                orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED);

        if(orderStatus != orderResponse.getStatus()) {
            order.set(binanceFuturesClient.getOrder(
                    orderResponse.getOrderId(), symbol.getBase(), symbol.getQuote(), MarginType.CROSSED));
            this.alertService.alert(
                    "targha_cycles", tag + " order status changed: " + orderResponse);
            log.info("{} order status changed: {}", tag, orderResponse);
        }

        if(orderStatus.isDone()) {
            this.alertService.alert(
                    "targha_cycles", tag + " order is done: " + orderResponse);
            log.info("{} order is done: {}", tag, orderResponse);
            order.set(null);
            doneOrders.add(orderResponse);
            lastFilledOrder.set(orderResponse);

            if(otherOrder.get() != null) {
                log.info("Cancel other order {}", otherOrder.get());
                binanceFuturesClient.cancelOrder(
                        symbol.getBase() + symbol.getQuote(), Long.parseLong(otherOrder.get().getOrderId()));
                otherOrder.set(null);
            }
            return true;
        }
        return false;
    }

    public void stop() {
        stopped.set(true);
    }

    public OrderResponse getOpenAskOrder() {
        return askOrder.get();
    }

    public OrderResponse getOpenBidOrder() {
        return bidOrder.get();
    }

    public List<OrderResponse> getAllOrders() {
        return Stream.concat(
                Stream.of(askOrder, bidOrder).map(AtomicReference::get).filter(Objects::nonNull),
                doneOrders.stream()).toList();
    }

    public OrderResponse getLastFilledOrder() {
        return lastFilledOrder.get();
    }

    public double getPosition() {
        return binanceFuturesClient.getPosition(symbol);
    }

    public double getTradeAverage() {
        return tradesMovingSequence.isFull() ? tradesMovingSequence.getMean() : Double.NaN;
    }

    @Override
    public Object getMonitoring() {
        return new Monitoring(
                lastPrice, priceChanges.sum(), doneOrders.size(), attemptedOrders.get());
    }

    public record Monitoring(
            Price lastPrice, long priceChanges, int doneOrders, long attemptedOrders
    ) {}
}