from Dictionary - Webflux
Webflux
SpringBoot Webflux는 서블릿을 버리고, 비동기로 진행하여 동시성을 최대화하여 가용성을 높히는 데 중점을 둔 프레임워크이다.
기본적으로 Java는 blocking code를 사용한다. Java는 Flow.Publisher을 이용해서 ReactiveStreams를 구현해서 사용할 수 있다.
이에 대한 실제 구현체는 RxJava, Reactor가 있다.
추가로 Tomcat을 버리고 Netty를 채택해서 EventLoop 기반으로 처리하여 가용성을 더 높혔다.
이를 사용하면
- 메모리 가용성이 높아진다.
- 파이프 라인으로 pub/sub 프로세스를 연결하여 처리한다.
Reactive
Java에서 비동기 처리?
- Callback -> Callback Hell에 빠질 수 있음
- Future -> CompletableFuture를 지원하긴 하지만 까다로움
백프레셔
Pub/Sub의 균형이 무너질 때 생기는 형상이다. 소비보다 생성이 많을 때 발생한다. 이는 결국 메모리가 overflow 되고 OutOfMemory로 이어진다.
push
Publisher가 이벤트를 밀어 넣는 형식 -> backpressure 발생할 수 있다.
pull
Subscribe가 요청한 만큼만 전달
Cold vs. Hot
- Cold ->
Subscribe로 이벤트 발현 - Hot ->
Subscriber로 부터 시작하지 않는다.Publisher가Emit하는 것을 기본으로 한다.
RxJava
ReactiveX(Netflix)에서 만들었다.
Publisher
- 한 건의 경우
Single - 데이터가 없거나 한 건이면
Maybe - 한 건 이상일 경우
Observalbe,Flow
RxJava에서 백프레셔 처리 방법
Observable대신Flowable사용 ```Observable
- 1000개 미만 데이터 발행시
- 적은 소스로 OOM 발생이 가능성이 낮을 경우
- GUI 이벤트 처리
Flowable
- 1000개 이상 데이터 발행 시
- 디스크 I/O
- JDBC
- 네트워크 I/O
Backpressure 제어 전략
- MISSING(BackpressureStrategy.MISSING) -> x
- ERROR(BackpressureStrategy.ERROR) -> 발생시
MissingBackpressureException- BUFFER(BackpressureStrategy.BUFFER) -> 소비할 때까지 Queue, OOM 가능성 이씀
- DROP(BackpressureStrategy.DROP) -> 소비하지 못한 데이터 버림
- LATEST(BackpressureStrategy.LATEST) -> 받을 준비가 될 때까지 최신만 유지하고 버림 ```
Reactor
Pivotal에서 만들었다.
Publisher
- 한 건은
Mono - 없거나 여러 건은
Flux
Reactor에서 백프레셔 처리 방법
- 버퍼링 -> OOM 가능성 있음
- DROP ->
onBackpressureDrop()으로 그냥 버려서 최악의 상황을 모면함 - Latest ->
onBackpressureLatest()으로 최근 이벤트만 유지 flatMap()-> 내부 Publisher로 변환하고 Merge하여 소비 속도 조절, 동시성 수준을 조절할 수 있어서 백프레셔 관리 가능(2번 째 파라미터로 동시성 제어)
Operator
Create
- just()
- create()
- fromStream()
- fromIterable()
Sequence Control
- flatMap() ->
map()+ 새로운 Sequence를 생성해서 내보 냄 - concat() -> Publisher의 Sequence를 연결해서 emit
- merge() -> Mono를 합침
- zip() -> 여러 개의 Sequnce에서 emit 된 데이터를 결합
peeking
- doOnNext() -> 처리되기 전 흐름에 영향 없이 무언가 하기 위해서
- doOnSuccess/doOnComplete -> 성공/완료 시
- doOnError -> 에러 종료 시
- doOnCancel -> 취소 시
- doFirst -> 시작 시
- doOnSubscribe -> 구독 시
- doOnRequest/doOnTerminate -> 요청/ 종료 시
- log() -> Singal 이벤트 로깅
Filter
- filter -> Filtering
- take -> 통과 개수 지정
error
- timeout -> timeOut 이내 emit 없으면 에러
- retry -> 주어진 숫자만큼 재구독하도록
ignoreElement
- then -> 작업 완료를 기다림
- thenReturn -> then + return some Mono
- thenMany -> then return + some Flux
- distinct -> 중복 제거
split (Flux -> Flux<Flux>)
- window(int)
- window(Duration)
- buffer
toSynchronous
- blockFirst/blockLast (flux)
- toIterable/toStream (flux)
- block (mono)
- toFuture (mono)
delay
- delay/delayUntil -> 완료 연기
R2DBC ( Reactive Relational DataBase Connectivity )
JDBC 논 블로킹 버전이라고 이해하면 그나마 낫다.
장점
- JDBC에서는 불가능한 논 블로킹 API를 제공한다.
단점
- JDBC와 다르다. (JDBC에서 당연하게 사용하던 것들을 사용할 수 없다.)(caching, lazyLoading, write-behind 등)
- 아직 성숙하지 않아서 공식적인 driver implementation이 없다.
- R2DBC가 JDBC의 next가 되지 않을 가능성이 있다. (ProjectLoom은 JVM base DB Driver를 만들고 있다.) Java19, Loom blogSpot
- JPA와 같은 관계성 매핑이 불가능하며, 캐싱, 영속성 컨텍스트 등을 제공하지 않는다.
QueryDsl
JPA 처럼 queryDsl을 Third party로 지원하지만(QueryDsl도 thirdParty지만) 완벽하지 않다.
에러 사항
- 네이밍 규칙
infobip Querydsl의 기본 네이밍 규칙이 Pascal이다. 이를 덮어 쓰기 위해서 Bean을 지정한다.
@Configuration
public class QuerydslConfig {
@Bean
public NamingStrategy namingStrategy() {
return new NamingStrategy(){
@Override
public String getTableName(Class<?> type) {
return type.getSimpleName();
}
@Override
public String getColumnName(RelationalPersistentProperty property) {
return property.getName().substring(0, 1).toUpperCase() + property.getName().substring(1);
}
};
}
}
infobip querydsl README에서도
Flyway를 쓰던 SqlTemplate을 @Bean으로 두던 하라고 한다.
- 템플릿 설정
@Configuration
public class SqlTemplatesConfig {
private final List<String> driver = Arrays.asList("mysql", "maria");
@Bean
public SQLTemplates sqlTemplates (R2dbcProperties properties) throws SQLException {
Boolean isStorageDB = driver.stream().map(driver -> properties.getUrl().contains(driver)).reduce(false, (p, n) -> p || n);
return isStorageDB ?
MySQLTemplates.builder().escape('\\').quote().newLineToSingleSpace().build() :
H2Templates.builder().quote().escape('\\').newLineToSingleSpace().build();
}
}
이 Template에 따라 Querydsl이 쿼리를 파싱하는 결과가 바뀐다. 뭐 예를 들어
interface BoardQueryDslRepository extends QuerydslR2dbcRepository<Board, String> {
}
@Repository
@RequiredArgsConstructor
@Slf4j
class BoardRepository {
private final BoardQueryDslRepository boardQuery;
private BooleanBuilder condition(BoardType type, Language locale) {
BooleanBuilder builder = new BooleanBuilder();
if(Objects.nonNull(type)) board.type.eq(type);
if(Objects.nonNull(locale)) builder.and(board.locale.eq(locale));
return builder;
}
public Mono<Template> template(BoardType type, Language locale) {
return boardQuery.query(query -> {
return query.select(
new QBoardeDto(
board.boardNo,
board.boardType,
board.locale,
board.contents,
board.lastModifiedDate
)
)
.from(template)
.where(this.condition(templateType, locale));
}).one();
}
}
이렇게 있다면 where 절의 파라미터에 값을 바인딩할 때 '?'으로 네이밍을 잡아서 바인딩하려다 실패를 한다던가 하는 문제가 발생할 수 있다.
내부적으로 MySQLSQLTemplates인지 instanceOf를 해서 파라미터 바인딩하는 것으로 보인다(Maria_r2dbc-connector). 따라서 기존에는 크게 신경 쓰지 않던 것에도
주의를 기울여야 한다.
- MariaConnector에서 지원하지 않아서 Enum을 parameter로 던지면 codec 문제를 일으킨다(QueryDSL 기준) -> 해결 하지 못했다.